wsd/Admin.cpp | 36 ++++++++++++----------------- wsd/Admin.hpp | 5 +--- wsd/AdminModel.cpp | 60 ++++++++++++++++++++++++++++++++++++++++++++----- wsd/AdminModel.hpp | 12 ++++++++- wsd/DocumentBroker.cpp | 7 ++++- wsd/DocumentBroker.hpp | 3 ++ wsd/LOOLWSD.cpp | 7 ++++- 7 files changed, 97 insertions(+), 33 deletions(-)
New commits: commit 0806986c8cd8147b925aeb167ff0708fac6f9725 Author: Michael Meeks <michael.me...@collabora.com> Date: Mon Apr 3 18:21:20 2017 +0100 Admin model locking - major cleanup. Do everything in the Admin Model in the AdminPoll thread. Everything else can push work there safely through callbacks. diff --git a/wsd/Admin.cpp b/wsd/Admin.cpp index 4ab3d821..78da7ac1 100644 --- a/wsd/Admin.cpp +++ b/wsd/Admin.cpp @@ -57,7 +57,6 @@ void AdminSocketHandler::handleMessage(bool /* fin */, WSOpCode /* code */, return; } - std::unique_lock<std::mutex> modelLock(_admin->getLock()); AdminModel& model = _admin->getModel(); if (tokens[0] == "auth") @@ -262,14 +261,13 @@ bool AdminSocketHandler::handleInitialRequest( Admin &admin = Admin::instance(); auto handler = std::make_shared<AdminSocketHandler>(&admin, socketWeak, request); socket->setHandler(handler); - - { // FIXME: weird locking around subscribe ... - std::unique_lock<std::mutex> modelLock(admin.getLock()); - // Subscribe the websocket of any AdminModel updates - AdminModel& model = admin.getModel(); + admin.addCallback([handler, sessionId] + { + Admin &adminIn = Admin::instance(); + AdminModel& model = adminIn.getModel(); handler->_sessionId = sessionId; model.subscribe(sessionId, handler); - } + }); return true; } @@ -293,7 +291,6 @@ Admin::Admin() : { LOG_INF("Admin ctor."); - std::unique_lock<std::mutex> modelLock(getLock()); const auto totalMem = getTotalMemoryUsage(); LOG_TRC("Total memory used: " << totalMem); _model.addMemStats(totalMem); @@ -308,6 +305,8 @@ void Admin::pollingThread() { std::chrono::steady_clock::time_point lastCPU, lastMem; + _model.setThreadOwner(std::this_thread::get_id()); + lastCPU = std::chrono::steady_clock::now(); lastMem = lastCPU; @@ -326,7 +325,6 @@ void Admin::pollingThread() std::chrono::duration_cast<std::chrono::milliseconds>(now - lastCPU).count(); if (memWait <= 0) { - std::unique_lock<std::mutex> modelLock(getLock()); const auto totalMem = getTotalMemoryUsage(); if (totalMem != _lastTotalMemory) { @@ -349,21 +347,20 @@ void Admin::pollingThread() void Admin::addDoc(const std::string& docKey, Poco::Process::PID pid, const std::string& filename, const std::string& sessionId) { - std::unique_lock<std::mutex> modelLock(_modelMutex); - _model.addDocument(docKey, pid, filename, sessionId); + addCallback([this, docKey, pid, filename, sessionId] + { _model.addDocument(docKey, pid, filename, sessionId); }); } void Admin::rmDoc(const std::string& docKey, const std::string& sessionId) { - std::unique_lock<std::mutex> modelLock(_modelMutex); - _model.removeDocument(docKey, sessionId); + addCallback([this, docKey, sessionId] + { _model.removeDocument(docKey, sessionId); }); } void Admin::rmDoc(const std::string& docKey) { - std::unique_lock<std::mutex> modelLock(_modelMutex); LOG_INF("Removing complete doc [" << docKey << "] from Admin."); - _model.removeDocument(docKey); + addCallback([this, docKey]{ _model.removeDocument(docKey); }); } void Admin::rescheduleMemTimer(unsigned interval) @@ -382,8 +379,6 @@ void Admin::rescheduleCpuTimer(unsigned interval) unsigned Admin::getTotalMemoryUsage() { - Util::assertIsLocked(_modelMutex); - // To simplify and clarify this; since load, link and pre-init all // inside the forkit - we should account all of our fixed cost of // memory to the forkit; and then count only dirty pages in the clients @@ -413,14 +408,13 @@ AdminModel& Admin::getModel() void Admin::updateLastActivityTime(const std::string& docKey) { - std::unique_lock<std::mutex> modelLock(_modelMutex); - _model.updateLastActivityTime(docKey); + addCallback([this, docKey]{ _model.updateLastActivityTime(docKey); }); } void Admin::updateMemoryDirty(const std::string& docKey, int dirty) { - std::unique_lock<std::mutex> modelLock(_modelMutex); - _model.updateMemoryDirty(docKey, dirty); + addCallback([this, docKey, dirty] + { _model.updateMemoryDirty(docKey, dirty); }); } void Admin::dumpState(std::ostream& os) diff --git a/wsd/Admin.hpp b/wsd/Admin.hpp index a816b636..033619a7 100644 --- a/wsd/Admin.hpp +++ b/wsd/Admin.hpp @@ -96,16 +96,15 @@ public: void rescheduleCpuTimer(unsigned interval); - std::unique_lock<std::mutex> getLock() { return std::unique_lock<std::mutex>(_modelMutex); } - void updateLastActivityTime(const std::string& docKey); void updateMemoryDirty(const std::string& docKey, int dirty); void dumpState(std::ostream& os) override; private: + /// The model is accessed only during startup & in + /// the Admin Poll thread. AdminModel _model; - std::mutex _modelMutex; int _forKitPid; long _lastTotalMemory; diff --git a/wsd/AdminModel.cpp b/wsd/AdminModel.cpp index 5e1aefc7..1f3e6dcc 100644 --- a/wsd/AdminModel.cpp +++ b/wsd/AdminModel.cpp @@ -94,8 +94,26 @@ void Subscriber::unsubscribe(const std::string& command) _subscriptions.erase(command); } +bool AdminModel::isCorrectThread() const +{ +#if ENABLE_DEBUG + // FIXME: share this code [!] + const bool sameThread = std::this_thread::get_id() == _owner; + if (!sameThread) + LOG_WRN("Admin command invoked from foreign thread. Expected: 0x" << std::hex << + _owner << " but called from 0x" << std::this_thread::get_id() << " (" << + std::dec << Util::getThreadId() << ")."); + + return sameThread; +#else + return true; +#endif +} + std::string AdminModel::query(const std::string& command) { + assert (isCorrectThread()); + const auto token = LOOLProtocol::getFirstToken(command); if (token == "documents") { @@ -132,6 +150,8 @@ std::string AdminModel::query(const std::string& command) /// Returns memory consumed by all active loolkit processes unsigned AdminModel::getKitsMemoryUsage() { + assert (isCorrectThread()); + unsigned totalMem = 0; unsigned docs = 0; for (const auto& it : _documents) @@ -158,6 +178,8 @@ unsigned AdminModel::getKitsMemoryUsage() void AdminModel::subscribe(int sessionId, const std::weak_ptr<WebSocketHandler>& ws) { + assert (isCorrectThread()); + const auto ret = _subscribers.emplace(sessionId, Subscriber(sessionId, ws)); if (!ret.second) { @@ -167,6 +189,8 @@ void AdminModel::subscribe(int sessionId, const std::weak_ptr<WebSocketHandler>& void AdminModel::subscribe(int sessionId, const std::string& command) { + assert (isCorrectThread()); + auto subscriber = _subscribers.find(sessionId); if (subscriber != _subscribers.end()) { @@ -176,37 +200,39 @@ void AdminModel::subscribe(int sessionId, const std::string& command) void AdminModel::unsubscribe(int sessionId, const std::string& command) { + assert (isCorrectThread()); + auto subscriber = _subscribers.find(sessionId); if (subscriber != _subscribers.end()) - { subscriber->second.unsubscribe(command); - } } void AdminModel::addMemStats(unsigned memUsage) { + assert (isCorrectThread()); + _memStats.push_back(memUsage); if (_memStats.size() > _memStatsSize) - { _memStats.pop_front(); - } notify("mem_stats " + std::to_string(memUsage)); } void AdminModel::addCpuStats(unsigned cpuUsage) { + assert (isCorrectThread()); + _cpuStats.push_back(cpuUsage); if (_cpuStats.size() > _cpuStatsSize) - { _cpuStats.pop_front(); - } notify("cpu_stats " + std::to_string(cpuUsage)); } void AdminModel::setCpuStatsSize(unsigned size) { + assert (isCorrectThread()); + int wasteValuesLen = _cpuStats.size() - size; while (wasteValuesLen-- > 0) { @@ -219,6 +245,8 @@ void AdminModel::setCpuStatsSize(unsigned size) void AdminModel::setMemStatsSize(unsigned size) { + assert (isCorrectThread()); + int wasteValuesLen = _memStats.size() - size; while (wasteValuesLen-- > 0) { @@ -231,6 +259,8 @@ void AdminModel::setMemStatsSize(unsigned size) void AdminModel::notify(const std::string& message) { + assert (isCorrectThread()); + if (!_subscribers.empty()) { LOG_TRC("Message to admin console: " << message); @@ -251,6 +281,8 @@ void AdminModel::notify(const std::string& message) void AdminModel::addDocument(const std::string& docKey, Poco::Process::PID pid, const std::string& filename, const std::string& sessionId) { + assert (isCorrectThread()); + const auto ret = _documents.emplace(docKey, Document(docKey, pid, filename)); ret.first->second.addView(sessionId); LOG_DBG("Added admin document [" << docKey << "]."); @@ -289,6 +321,8 @@ void AdminModel::addDocument(const std::string& docKey, Poco::Process::PID pid, void AdminModel::removeDocument(const std::string& docKey, const std::string& sessionId) { + assert (isCorrectThread()); + auto docIt = _documents.find(docKey); if (docIt != _documents.end() && !docIt->second.isExpired()) { @@ -311,6 +345,8 @@ void AdminModel::removeDocument(const std::string& docKey, const std::string& se void AdminModel::removeDocument(const std::string& docKey) { + assert (isCorrectThread()); + auto docIt = _documents.find(docKey); if (docIt != _documents.end()) { @@ -332,6 +368,8 @@ void AdminModel::removeDocument(const std::string& docKey) std::string AdminModel::getMemStats() { + assert (isCorrectThread()); + std::ostringstream oss; for (const auto& i: _memStats) { @@ -343,6 +381,8 @@ std::string AdminModel::getMemStats() std::string AdminModel::getCpuStats() { + assert (isCorrectThread()); + std::ostringstream oss; for (const auto& i: _cpuStats) { @@ -354,6 +394,8 @@ std::string AdminModel::getCpuStats() unsigned AdminModel::getTotalActiveViews() { + assert (isCorrectThread()); + unsigned numTotalViews = 0; for (const auto& it: _documents) { @@ -368,6 +410,8 @@ unsigned AdminModel::getTotalActiveViews() std::string AdminModel::getDocuments() const { + assert (isCorrectThread()); + std::ostringstream oss; for (const auto& it: _documents) { @@ -389,6 +433,8 @@ std::string AdminModel::getDocuments() const void AdminModel::updateLastActivityTime(const std::string& docKey) { + assert (isCorrectThread()); + auto docIt = _documents.find(docKey); if (docIt != _documents.end()) { @@ -410,6 +456,8 @@ bool Document::updateMemoryDirty(int dirty) void AdminModel::updateMemoryDirty(const std::string& docKey, int dirty) { + assert (isCorrectThread()); + auto docIt = _documents.find(docKey); if (docIt != _documents.end() && docIt->second.updateMemoryDirty(dirty)) diff --git a/wsd/AdminModel.hpp b/wsd/AdminModel.hpp index 8250687c..57bd702b 100644 --- a/wsd/AdminModel.hpp +++ b/wsd/AdminModel.hpp @@ -138,7 +138,8 @@ private: class AdminModel { public: - AdminModel() + AdminModel() : + _owner(std::this_thread::get_id()) { LOG_INF("AdminModel ctor."); } @@ -148,6 +149,12 @@ public: LOG_INF("AdminModel dtor."); } + /// All methods here must be called from the Admin socket-poll + void setThreadOwner(const std::thread::id &id) { _owner = id; } + + /// In debug mode check that code is running in the correct thread. + bool isCorrectThread() const; + std::string query(const std::string& command); /// Returns memory consumed by all active loolkit processes @@ -199,6 +206,9 @@ private: std::list<unsigned> _cpuStats; unsigned _cpuStatsSize = 100; + + // always enabled to avoid ABI change in debug mode ... + std::thread::id _owner; }; #endif commit 94022e90d9f76b71c2dfde711188736d4c9f8b08 Author: Michael Meeks <michael.me...@collabora.com> Date: Fri Mar 31 20:58:33 2017 +0100 Join threads to force a reasonably sensible shutdown sequence. ie. actually wait until documents are saved and sessions closed. diff --git a/wsd/DocumentBroker.cpp b/wsd/DocumentBroker.cpp index aab632bb..483e7c5e 100644 --- a/wsd/DocumentBroker.cpp +++ b/wsd/DocumentBroker.cpp @@ -295,6 +295,11 @@ DocumentBroker::~DocumentBroker() _childProcess.reset(); } +void DocumentBroker::joinThread() +{ + _poll->joinThread(); +} + bool DocumentBroker::load(const std::shared_ptr<ClientSession>& session, const std::string& jailId) { assert(isCorrectThread()); diff --git a/wsd/DocumentBroker.hpp b/wsd/DocumentBroker.hpp index 91dfb63a..168734cc 100644 --- a/wsd/DocumentBroker.hpp +++ b/wsd/DocumentBroker.hpp @@ -225,6 +225,9 @@ public: //TODO: Take reason to broadcast to clients. void stop() { _stop = true; } + /// Thread safe termination of this broker if it has a lingering thread + void joinThread(); + /// Loads a document from the public URI into the jail. bool load(const std::shared_ptr<ClientSession>& session, const std::string& jailId); bool isLoaded() const { return _isLoaded; } diff --git a/wsd/LOOLWSD.cpp b/wsd/LOOLWSD.cpp index a47a3511..1ac9efa2 100644 --- a/wsd/LOOLWSD.cpp +++ b/wsd/LOOLWSD.cpp @@ -2179,6 +2179,8 @@ public: { _stop = true; SocketPoll::wakeupWorld(); + _acceptPoll.joinThread(); + WebServerPoll.joinThread(); } void dumpState(std::ostream& os) @@ -2458,10 +2460,11 @@ int LOOLWSD::innerMain() // Wait until documents are saved and sessions closed. srv.stop(); - WebServerPoll.stop(); // atexit handlers tend to free Admin before Documents LOG_INF("Cleaning up lingering documents."); + for (auto& docBrokerIt : DocBrokers) + docBrokerIt.second->joinThread(); DocBrokers.clear(); #ifndef KIT_IN_PROCESS @@ -2470,6 +2473,8 @@ int LOOLWSD::innerMain() SigUtil::killChild(ForKitProcId); #endif + PrisonerPoll.joinThread(); + // Terminate child processes LOG_INF("Requesting child processes to terminate."); for (auto& child : NewChildren) commit 3e1351ec79d5a5804f15ce32f3324ceb93750f47 Author: Michael Meeks <michael.me...@collabora.com> Date: Fri Mar 31 17:55:26 2017 +0100 Correct obsolete method name. diff --git a/wsd/DocumentBroker.cpp b/wsd/DocumentBroker.cpp index 910f328f..aab632bb 100644 --- a/wsd/DocumentBroker.cpp +++ b/wsd/DocumentBroker.cpp @@ -759,7 +759,7 @@ size_t DocumentBroker::addSession(const std::shared_ptr<ClientSession>& session) throw; } - // Below values are recalculated when startDestroy() is called (before destroying the + // Below values are recalculated when destroyIfLastEditor() is called (before destroying the // document). It is safe to reset their values to their defaults whenever a new session is added. _lastEditableSession = false; _markToDestroy = false; _______________________________________________ Libreoffice-commits mailing list libreoffice-comm...@lists.freedesktop.org https://lists.freedesktop.org/mailman/listinfo/libreoffice-commits