loolwsd/LOOLKit.cpp | 254 ++++++++-------------------------------------------- 1 file changed, 42 insertions(+), 212 deletions(-)
New commits: commit e0ff6eef6b636789b0f7e849df77a6047fec5bae Author: Ashod Nakashian <ashod.nakash...@collabora.co.uk> Date: Sun Oct 9 17:18:29 2016 -0400 loolwsd: kill Connection object in child Change-Id: Ic4d0d3e7286272a0765b299824dfa3556fe56f4b Reviewed-on: https://gerrit.libreoffice.org/29652 Reviewed-by: Ashod Nakashian <ashnak...@gmail.com> Tested-by: Ashod Nakashian <ashnak...@gmail.com> diff --git a/loolwsd/LOOLKit.cpp b/loolwsd/LOOLKit.cpp index 95d3f8c..852be7a 100644 --- a/loolwsd/LOOLKit.cpp +++ b/loolwsd/LOOLKit.cpp @@ -258,123 +258,6 @@ namespace #endif } -/// Connection thread with a client (via WSD). -class Connection: public Runnable -{ -public: - Connection(std::shared_ptr<ChildSession> session, - std::shared_ptr<WebSocket> ws) : - _sessionId(session->getId()), - _session(std::move(session)), - _ws(std::move(ws)), - _threadMutex(), - _joined(false) - { - Log::info("Connection ctor in child for " + _sessionId); - } - - ~Connection() - { - Log::info("~Connection dtor in child for " + _sessionId); - stop(); - join(); - } - - const std::string& getSessionId() const { return _sessionId; }; - std::shared_ptr<WebSocket> getWebSocket() const { return _ws; } - std::shared_ptr<ChildSession> getSession() { return _session; } - - void start() - { - _thread.start(*this); - - // Busy-wait until we run. - // This is important to make sure we can process - // callbacks, which if we're late to start will - // be dropped. No need for async notification here. - constexpr auto delay = COMMAND_TIMEOUT_MS / 20; - for (auto i = 0; i < 20 && !isRunning(); ++i) - { - std::this_thread::sleep_for(std::chrono::milliseconds(delay)); - } - } - - bool isRunning() - { - return _thread.isRunning(); - } - - void stop() - { - // What should we do here? - } - - void join() - { - // The thread is joinable only once. - std::unique_lock<std::mutex> lock(_threadMutex); - if (!_joined) - { - _thread.join(); - _joined = true; - } - } - - void run() override - { - Util::setThreadName("kit_ws_" + _sessionId); - - Log::debug("Thread started."); - try - { - IoUtil::SocketProcessor(_ws, - [this](const std::vector<char>& payload) - { - if (!_session->handleInput(payload.data(), payload.size())) - { - Log::info("Socket handler flagged for finishing."); - return false; - } - - return true; - }, - [this]() { _session->closeFrame(); }, - []() { return !!TerminationFlag; }); - - if (_session->isCloseFrame()) - { - Log::trace("Normal close handshake."); - _ws->shutdown(); - } - else - { - Log::trace("Abnormal close handshake."); - _ws->shutdown(WebSocket::WS_ENDPOINT_GOING_AWAY, SERVICE_UNAVALABLE_INTERNAL_ERROR); - } - } - catch (const Exception& exc) - { - Log::error() << "Connection::run: Exception: " << exc.displayText() - << (exc.nested() ? " (" + exc.nested()->displayText() + ")" : "") - << Log::end; - } - catch (const std::exception& exc) - { - Log::error(std::string("Connection::run: Exception: ") + exc.what()); - } - - Log::debug("Thread finished."); - } - -private: - const std::string _sessionId; - Thread _thread; - std::shared_ptr<ChildSession> _session; - std::shared_ptr<WebSocket> _ws; - std::mutex _threadMutex; - std::atomic<bool> _joined; -}; - /// A document container. /// Owns LOKitDocument instance and connections. /// Manages the lifetime of a document. @@ -464,10 +347,10 @@ public: if (!_sessions.emplace(sessionId, session).second) { - Log::error("Connection already exists for child: " + _jailId + ", session: " + sessionId); + Log::error("Session already exists for child: " + _jailId + ", session: " + sessionId); } - Log::debug("Connections: " + std::to_string(_sessions.size())); + Log::debug("Sessions: " + std::to_string(_sessions.size())); return true; } catch (const std::exception& ex) @@ -539,7 +422,7 @@ public: /// Returns true if at least one *live* connection exists. /// Does not consider user activity, just socket status. - bool hasConnections() + bool hasSessions() { // -ve values for failure. return purgeSessions() != 0; @@ -550,7 +433,7 @@ public: bool canDiscard() { //TODO: Implement proper time-out on inactivity. - return !hasConnections(); + return !hasSessions(); } /// Set Document password for given URL @@ -1296,7 +1179,7 @@ private: } else { - Log::error() << "Connection thread for session " << session->getId() << " for view " + Log::error() << "Session thread for session " << session->getId() << " for view " << viewId << " is not running. Dropping [" << LOKitHelper::kitCallbackTypeToString(type) << "] payload [" << payload << "]." << Log::end; } commit e33eff4abdc4a3145b5bf7a9b6def2fce63d64a6 Author: Ashod Nakashian <ashod.nakash...@collabora.co.uk> Date: Sun Oct 9 17:05:24 2016 -0400 loolwsd: cleanup of Connection in ChildSession Change-Id: I07636163df7b2973dada55b9704abf7105ad285f Reviewed-on: https://gerrit.libreoffice.org/29651 Reviewed-by: Ashod Nakashian <ashnak...@gmail.com> Tested-by: Ashod Nakashian <ashnak...@gmail.com> diff --git a/loolwsd/LOOLKit.cpp b/loolwsd/LOOLKit.cpp index 17f7f3f..95d3f8c 100644 --- a/loolwsd/LOOLKit.cpp +++ b/loolwsd/LOOLKit.cpp @@ -428,62 +428,21 @@ public: _tileQueue->put("eof"); _callbackThread.join(); - - // Flag all connections to stop. - for (auto aIterator : _connections) - { - aIterator.second->stop(); - } - - // Destroy all connections and views. - for (auto aIterator : _connections) - { - try - { - // stop all websockets - if (aIterator.second->isRunning()) - { - std::shared_ptr<WebSocket> ws = aIterator.second->getWebSocket(); - if (ws) - { - ws->shutdownReceive(); - aIterator.second->join(); - } - } - } - catch(NetException& exc) - { - Log::error() << "Document::~Document: " << exc.displayText() - << (exc.nested() ? " (" + exc.nested()->displayText() + ")" : "") - << Log::end; - } - } - - // Destroy all connections and views. - _connections.clear(); } const std::string& getUrl() const { return _url; } - bool createSession(const std::string& sessionId, const unsigned intSessionId) + bool createSession(const std::string& sessionId) { std::unique_lock<std::mutex> lock(_mutex); try { - const auto& it = _connections.find(intSessionId); - if (it != _connections.end()) + const auto& it = _sessions.find(sessionId); + if (it != _sessions.end()) { - // found item, check if still running - if (it->second->isRunning()) - { - Log::warn("Session [" + sessionId + "] is already running."); - return true; - } - - // Restore thread. TODO: Review this logic. - Log::warn("Session [" + sessionId + "] is not running. Restoring."); - _connections.erase(intSessionId); + Log::warn("Session [" + sessionId + "] is already running."); + return true; } Log::info() << "Creating " << (_clientViews ? "new" : "first") @@ -503,18 +462,12 @@ public: auto session = std::make_shared<ChildSession>(sessionId, ws, _jailId, *this); - auto thread = std::make_shared<Connection>(session, ws); - const auto aInserted = _connections.emplace(intSessionId, thread); - if (aInserted.second) - { - thread->start(); - } - else + if (!_sessions.emplace(sessionId, session).second) { Log::error("Connection already exists for child: " + _jailId + ", session: " + sessionId); } - Log::debug("Connections: " + std::to_string(_connections.size())); + Log::debug("Connections: " + std::to_string(_sessions.size())); return true; } catch (const std::exception& ex) @@ -531,7 +484,7 @@ public: { std::vector<std::shared_ptr<ChildSession>> deadSessions; size_t numRunning = 0; - size_t num_connections = 0; + size_t num_sessions = 0; { std::unique_lock<std::mutex> lock(_mutex, std::defer_lock); if (!lock.try_lock()) @@ -544,20 +497,20 @@ public: // bluntly exit, no need to clean up our own data structures. Also, there is a bug that // causes the deadSessions.clear() call below to crash in some situations when the last // session is being removed. - for (auto it = _connections.cbegin(); it != _connections.cend(); ++it) + for (auto it = _sessions.cbegin(); it != _sessions.cend(); ++it) { - if (it->second->isRunning()) + if (!it->second->isCloseFrame()) numRunning++; } if (numRunning > 0) { - for (auto it = _connections.cbegin(); it != _connections.cend(); ) + for (auto it = _sessions.cbegin(); it != _sessions.cend(); ) { - if (!it->second->isRunning()) + if (it->second->isCloseFrame()) { - deadSessions.push_back(it->second->getSession()); - it = _connections.erase(it); + deadSessions.push_back(it->second); + it = _sessions.erase(it); } else { @@ -566,7 +519,7 @@ public: } } - num_connections = _connections.size(); + num_sessions = _sessions.size(); } if (numRunning == 0) @@ -581,7 +534,7 @@ public: // and the dtor tries to take its lock (which is taken). deadSessions.clear(); - return num_connections; + return num_sessions; } /// Returns true if at least one *live* connection exists. @@ -1027,11 +980,11 @@ private: std::unique_lock<std::mutex> lock(_mutex); std::map<int, std::string> viewInfo; - for (auto& connection : _connections) + for (auto& pair : _sessions) { - if (connection.second->isRunning()) + const auto session = pair.second; + if (!session->isCloseFrame()) { - const auto session = connection.second->getSession(); const auto viewId = session->getViewId(); viewInfo[viewId] = session->getViewUserName(); } @@ -1083,15 +1036,12 @@ private: viewInfoArray->stringify(ossViewInfo); // Broadcast updated viewinfo to all _active_ connections - for (auto& connectionIt: _connections) + for (auto& pair : _sessions) { - if (connectionIt.second->isRunning()) + const auto session = pair.second; + if (!session->isCloseFrame() && session->isActive()) { - auto session = connectionIt.second->getSession(); - if (session->isActive()) - { - session->sendTextFrame("viewinfo: " + ossViewInfo.str()); - } + session->sendTextFrame("viewinfo: " + ossViewInfo.str()); } } } @@ -1105,15 +1055,14 @@ private: const std::string& renderOpts, const bool haveDocPassword) { - const unsigned intSessionId = Util::decodeId(sessionId); - const auto it = _connections.find(intSessionId); - if (it == _connections.end() || !it->second) + const auto it = _sessions.find(sessionId); + if (it == _sessions.end() || !it->second) { Log::error("Cannot find session [" + sessionId + "]."); return nullptr; } - auto session = it->second->getSession(); + auto session = it->second; int viewId = 0; std::unique_lock<std::mutex> lockLokDoc; @@ -1257,21 +1206,20 @@ private: Log::trace("Forwarding payload to " + prefix + ' ' + message); std::string name; - std::string value; - if (LOOLProtocol::parseNameValuePair(prefix, name, value, '-') && name == "child") + std::string viewId; + if (LOOLProtocol::parseNameValuePair(prefix, name, viewId, '-') && name == "child") { - const unsigned viewId = Util::decodeId(value); - const auto it = _connections.find(viewId); - if (it != _connections.end()) + const auto it = _sessions.find(viewId); + if (it != _sessions.end()) { if (message == "disconnect") { - Log::debug("Removing ChildSession " + value); - _connections.erase(it); + Log::debug("Removing ChildSession " + viewId); + _sessions.erase(it); return true; } - auto session = it->second->getSession(); + auto session = it->second; if (session) { return session->handleInput(message.data(), message.size()); @@ -1336,19 +1284,19 @@ private: // Forward the callback to the same view, demultiplexing is done by the LibreOffice core. // TODO: replace with a map to be faster. bool isFound = false; - for (auto& it : _connections) + for (auto& it : _sessions) { - auto session = it.second->getSession(); + auto session = it.second; if (session && ((session->getViewId() == viewId) || (viewId == -1))) { - if (it.second->isRunning()) + if (!it.second->isCloseFrame()) { isFound = true; session->loKitCallback(type, payload); } else { - Log::error() << "Connection thread for session " << it.second->getSessionId() << " for view " + Log::error() << "Connection thread for session " << session->getId() << " for view " << viewId << " is not running. Dropping [" << LOKitHelper::kitCallbackTypeToString(type) << "] payload [" << payload << "]." << Log::end; } @@ -1404,7 +1352,7 @@ private: std::condition_variable _cvLoading; std::atomic_size_t _isLoading; std::map<int, std::unique_ptr<CallbackDescriptor>> _viewIdToCallbackDescr; - std::map<unsigned, std::shared_ptr<Connection>> _connections; + std::map<std::string, std::shared_ptr<ChildSession>> _sessions; Poco::Thread _callbackThread; std::atomic_size_t _clientViews; }; @@ -1628,7 +1576,6 @@ void lokit_main(const std::string& childRoot, else if (tokens[0] == "session") { const std::string& sessionId = tokens[1]; - const unsigned intSessionId = Util::decodeId(sessionId); const std::string& docKey = tokens[2]; std::string url; @@ -1642,7 +1589,7 @@ void lokit_main(const std::string& childRoot, // Validate and create session. if (!(url == document->getUrl() && - document->createSession(sessionId, intSessionId))) + document->createSession(sessionId))) { Log::debug("CreateSession failed."); } _______________________________________________ Libreoffice-commits mailing list libreoffice-comm...@lists.freedesktop.org https://lists.freedesktop.org/mailman/listinfo/libreoffice-commits