loolwsd/LOOLWSD.cpp | 154 ++++++++++++++++++++++++++++++---------------------- 1 file changed, 90 insertions(+), 64 deletions(-)
New commits: commit 74cb8d84d208b132842c9308cea22ed7122c8c93 Author: Ashod Nakashian <ashod.nakash...@collabora.co.uk> Date: Sun Apr 24 12:36:27 2016 -0400 loolwsd: exception-safe session and document management Sessions are referrenced in DocumentBroker instances, which themselves are referrenced in a container. When exceptions are thrown either while creating a new session, or during the lifetime of one, these references must be correctly cleaned up, otherwise we introduce internal instability in addition to stalling the client. Change-Id: I3177e45564860897528da6d7fbcbe346d3bd1c75 Reviewed-on: https://gerrit.libreoffice.org/24338 Reviewed-by: Ashod Nakashian <ashnak...@gmail.com> Tested-by: Ashod Nakashian <ashnak...@gmail.com> diff --git a/loolwsd/LOOLWSD.cpp b/loolwsd/LOOLWSD.cpp index ea40a49..a40edc8 100644 --- a/loolwsd/LOOLWSD.cpp +++ b/loolwsd/LOOLWSD.cpp @@ -498,8 +498,6 @@ private: const auto uriPublic = DocumentBroker::sanitizeURI(uri); const auto docKey = DocumentBroker::getDocKey(uriPublic); std::shared_ptr<DocumentBroker> docBroker; - // This lock could become a bottleneck. - // In that case, we can use a pool and index by publicPath. std::unique_lock<std::mutex> docBrokersLock(docBrokersMutex); // Lookup this document. @@ -510,17 +508,20 @@ private: Log::debug("Found DocumentBroker for docKey [" + docKey + "]."); docBroker = it->second; assert(docBroker); + } + docBrokersLock.unlock(); + if (docBroker) + { // If this document is going out, wait. if (docBroker->isMarkedToDestroy()) { - Log::debug("Document [" + docKey + "] is marked to destroy, waiting to load."); + Log::debug("Document [" + docKey + "] is marked to destroy, waiting to reload."); const auto timeout = POLL_TIMEOUT_MS / 2; for (size_t i = 0; i < COMMAND_TIMEOUT_MS / timeout; ++i) { - docBrokersLock.unlock(); std::this_thread::sleep_for(std::chrono::milliseconds(timeout)); - docBrokersLock.lock(); + std::unique_lock<std::mutex> lock(docBrokersMutex); if (docBrokers.find(docKey) == docBrokers.end()) { docBroker.reset(); @@ -531,102 +532,127 @@ private: if (docBroker) { // Still here, but marked to destroy. - throw std::runtime_error("Cannot load a view to document while unloading."); + Log::error("Timed out while waiting for document to unload befor loading. Service Unavailable."); + throw WebSocketErrorMessageException(SERVICE_UNAVALABLE_INTERNAL_ERROR); } } } + bool newDoc = false; if (!docBroker) { + newDoc = true; // Request a kit process for this doc. auto child = getNewChild(); if (!child) { // Let the client know we can't serve now. - Log::error("Failed to get new child. Client cannot serve now."); + Log::error("Failed to get new child. Service Unavailable."); throw WebSocketErrorMessageException(SERVICE_UNAVALABLE_INTERNAL_ERROR); } // Set one we just created. Log::debug("New DocumentBroker for docKey [" + docKey + "]."); docBroker = std::make_shared<DocumentBroker>(uriPublic, docKey, LOOLWSD::ChildRoot, child); - docBrokers.emplace(docKey, docBroker); + } + + // Validate the broker. + if (!docBroker || !docBroker->isAlive()) + { + Log::error("DocBroker is invalid or child had SDS. Service Unavailable."); + if (!newDoc) + { + // Remove. + std::unique_lock<std::mutex> lock(docBrokersMutex); + docBrokers.erase(docKey); + throw WebSocketErrorMessageException(SERVICE_UNAVALABLE_INTERNAL_ERROR); + } } // Validate the URI and Storage before moving on. docBroker->validate(uriPublic); Log::debug("Validated [" + uriPublic.toString() + "]."); - // For ToClient sessions, we store incoming messages in a queue and have a separate - // thread that handles them. This is so that we can empty the queue when we get a - // "canceltiles" message. - auto queue = std::make_shared<BasicTileQueue>(); - auto session = std::make_shared<MasterProcessSession>(id, LOOLSession::Kind::ToClient, ws, docBroker, queue); - auto sessionsCount = docBroker->addSession(session); - docBrokersLock.unlock(); - Log::trace(docKey + ", ws_sessions++: " + std::to_string(sessionsCount)); - - // indicator to a client that is waiting to connect to lokit process - status = "statusindicator: connect"; - ws->sendFrame(status.data(), (int) status.size()); - - if (!waitBridgeCompleted(session, docBroker)) + if (newDoc) { - // Let the client know we can't serve now. - Log::error(session->getName() + ": Failed to connect to lokit process. Client cannot serve now."); - throw WebSocketErrorMessageException(SERVICE_UNAVALABLE_INTERNAL_ERROR); + std::unique_lock<std::mutex> lock(docBrokersMutex); + docBrokers.emplace(docKey, docBroker); } - // Now the bridge beetween the client and kit process is connected - // Let messages flow - status = "statusindicator: ready"; - ws->sendFrame(status.data(), (int) status.size()); - - QueueHandler handler(queue, session, "wsd_queue_" + session->getId()); + // Above this point exceptions are safe and will auto-cleanup. + // Below this, we need to cleanup internal references. + std::shared_ptr<MasterProcessSession> session; + try + { + // For ToClient sessions, we store incoming messages in a queue and have a separate + // thread to pump them. This is to empty the queue when we get a "canceltiles" message. + auto queue = std::make_shared<BasicTileQueue>(); + session = std::make_shared<MasterProcessSession>(id, LOOLSession::Kind::ToClient, ws, docBroker, queue); + const auto sessionsCount = docBroker->addSession(session); + Log::trace(docKey + ", ws_sessions++: " + std::to_string(sessionsCount)); + + // indicator to a client that is waiting to connect to lokit process + status = "statusindicator: connect"; + ws->sendFrame(status.data(), (int) status.size()); + + if (!waitBridgeCompleted(session, docBroker)) + { + // Let the client know we can't serve now. + Log::error(session->getName() + ": Failed to connect to lokit process. Client cannot serve now."); + throw WebSocketErrorMessageException(SERVICE_UNAVALABLE_INTERNAL_ERROR); + } - Thread queueHandlerThread; - queueHandlerThread.start(handler); + // Now the bridge beetween the client and kit process is connected + // Let messages flow + status = "statusindicator: ready"; + ws->sendFrame(status.data(), (int) status.size()); - IoUtil::SocketProcessor(ws, - [&queue](const std::vector<char>& payload) - { - queue->put(payload); - return true; - }, - [&session]() { session->closeFrame(); }, - [&queueHandlerThread]() { return TerminationFlag && queueHandlerThread.isRunning(); }); + QueueHandler handler(queue, session, "wsd_queue_" + session->getId()); + Thread queueHandlerThread; + queueHandlerThread.start(handler); - docBrokersLock.lock(); - const bool canDestroy = docBroker->canDestroy(); - docBrokersLock.unlock(); + IoUtil::SocketProcessor(ws, + [&queue](const std::vector<char>& payload) + { + queue->put(payload); + return true; + }, + [&session]() { session->closeFrame(); }, + [&queueHandlerThread]() { return TerminationFlag && queueHandlerThread.isRunning(); }); - if (canDestroy && !session->_bLoadError) - { - Log::info("Shutdown of the last session, saving the document before tearing down."); - - // Use auto-save to save only when there are modifications since last save. - // We also need to wait until the save notification reaches us - // and Storage persists the document. - // Note: technically, there is a race between these two (we should - // hold the broker lock before issueing the save and waiting,) - // but in practice this shouldn't happen. - if (docBroker->autoSave(true) && !docBroker->waitSave(COMMAND_TIMEOUT_MS)) + const bool canDestroy = docBroker->canDestroy(); + if (canDestroy && !session->_bLoadError) { - Log::error("Auto-save before closing failed."); + Log::info("Shutdown of the last session, saving the document before tearing down."); + + // Use auto-save to save only when there are modifications since last save. + // We also need to wait until the save notification reaches us + // and Storage persists the document. + // Note: technically, there is a race between these two (we should + // hold the broker lock before issueing the save and waiting,) + // but in practice this shouldn't happen. + if (docBroker->autoSave(true) && !docBroker->waitSave(COMMAND_TIMEOUT_MS)) + { + Log::error("Auto-save before closing failed."); + } } + else + { + Log::info("Clearing the queue."); + queue->clear(); + } + + Log::info("Finishing GET request handler for session [" + id + "]. Joining the queue."); + queue->put("eof"); + queueHandlerThread.join(); } - else + catch (const std::exception& exc) { - Log::info("Clearing the queue."); - queue->clear(); + Log::error("Error in client request handler: " + std::string(exc.what())); } - Log::info("Finishing GET request handler for session [" + id + "]. Joining the queue."); - queue->put("eof"); - queueHandlerThread.join(); - docBrokersLock.lock(); - sessionsCount = docBroker->removeSession(id); + const auto sessionsCount = docBroker->removeSession(id); Log::trace(docKey + ", ws_sessions--: " + std::to_string(sessionsCount)); if (sessionsCount == 0) { _______________________________________________ Libreoffice-commits mailing list libreoffice-comm...@lists.freedesktop.org https://lists.freedesktop.org/mailman/listinfo/libreoffice-commits