loolwsd/IoUtil.cpp | 19 ++++++---- loolwsd/IoUtil.hpp | 2 - loolwsd/LOOLBroker.cpp | 73 +++++---------------------------------- loolwsd/LOOLKit.cpp | 18 ++++----- loolwsd/LOOLWSD.cpp | 12 +++--- loolwsd/MasterProcessSession.cpp | 8 +++- 6 files changed, 46 insertions(+), 86 deletions(-)
New commits: commit fe69c4d5b647d0191748956ffc2c5255b914b858 Author: Ashod Nakashian <ashod.nakash...@collabora.co.uk> Date: Mon Mar 28 16:07:02 2016 -0400 loolwsd: pipe plumbing cleanup Change-Id: I5519235a4601e1e38cedc3f06ffe9386434a292d Reviewed-on: https://gerrit.libreoffice.org/23645 Reviewed-by: Ashod Nakashian <ashnak...@gmail.com> Tested-by: Ashod Nakashian <ashnak...@gmail.com> diff --git a/loolwsd/IoUtil.cpp b/loolwsd/IoUtil.cpp index b2bd61e..c7cc54c 100644 --- a/loolwsd/IoUtil.cpp +++ b/loolwsd/IoUtil.cpp @@ -207,14 +207,14 @@ void shutdownWebSocket(std::shared_ptr<Poco::Net::WebSocket> ws) ssize_t writeFIFO(int pipe, const char* buffer, ssize_t size) { - ssize_t bytes = -1; ssize_t count = 0; - while(true) { - bytes = write(pipe, buffer + count, size - count); + Log::trace("Writing to pipe. Data: [" + std::string(buffer, size) + "]."); + const auto bytes = write(pipe, buffer + count, size - count); if (bytes < 0) { + Log::error("Failed to write to pipe. Retrying. Data: [" + std::string(buffer, size) + "]."); if (errno == EINTR || errno == EAGAIN) continue; @@ -281,6 +281,8 @@ int PipeReader::readLine(std::string& line, // We have a line cached, return it. line += std::string(_data.data(), endOfLine); _data.erase(0, endOfLine - _data.data() + 1); // Including the '\n'. + Log::trace() << "Read existing line from pipe: " << _name << ", line: [" + << line << "], data: [" << _data << "]." << Log::end; return 1; } @@ -291,7 +293,7 @@ int PipeReader::readLine(std::string& line, { if (stopPredicate()) { - Log::info() << "Spot requested for pipe: " << _name << Log::end; + Log::info() << "Stop requested for pipe: " << _name << '.' << Log::end; return -1; } @@ -300,6 +302,7 @@ int PipeReader::readLine(std::string& line, pipe.events = POLLIN; pipe.revents = 0; const int ready = poll(&pipe, 1, pollTimeoutMs); + Log::trace() << "Poll for pipe: " << _name << " returned: " << ready << Log::end; if (ready == 0) { // Timeout. @@ -314,6 +317,7 @@ int PipeReader::readLine(std::string& line, { char buffer[READ_BUFFER_SIZE]; const auto bytes = readFIFO(_pipe, buffer, sizeof(buffer)); + Log::trace() << "readFIFO for pipe: " << _name << " returned: " << bytes << Log::end; if (bytes < 0) { return -1; @@ -324,15 +328,18 @@ int PipeReader::readLine(std::string& line, { // Got end of line. line = _data; - auto tail = std::string(static_cast<const char*>(buffer), endOfLine); + const auto tail = std::string(static_cast<const char*>(buffer), endOfLine); line += tail; - _data = std::string(endOfLine, bytes - tail.size() - 1); // Exclude the '\n'. + _data = std::string(endOfLine + 1, bytes - tail.size() - 1); // Exclude the '\n'. + Log::trace() << "Read line from pipe: " << _name << ", line: [" << line + << "], data: [" << _data << "]." << Log::end; return 1; } else { // More data, keep going. _data += std::string(buffer, bytes); + Log::trace() << "data appended to pipe: " << _name << ", data: " << _data << Log::end; } } else if (pipe.revents & (POLLERR | POLLHUP | POLLNVAL)) diff --git a/loolwsd/IoUtil.hpp b/loolwsd/IoUtil.hpp index c7a4e00..016a808 100644 --- a/loolwsd/IoUtil.hpp +++ b/loolwsd/IoUtil.hpp @@ -59,7 +59,7 @@ namespace IoUtil /// On success, line will contain the read message. int readLine(std::string& line, std::function<bool()> stopPredicate, - const size_t timeoutMs); + const size_t timeoutMs = POLL_TIMEOUT_MS); void process(std::function<bool(std::string& message)> handler, std::function<bool()> stopPredicate, diff --git a/loolwsd/LOOLBroker.cpp b/loolwsd/LOOLBroker.cpp index a551365..fdb43a3 100644 --- a/loolwsd/LOOLBroker.cpp +++ b/loolwsd/LOOLBroker.cpp @@ -184,65 +184,14 @@ namespace class PipeRunnable: public Runnable { public: - PipeRunnable() - : _start(nullptr), - _end(nullptr) + PipeRunnable() : + _childPipeReader("child_pipe_rd", readerChild) { } - ssize_t getResponseLine(const int pipeReader, std::string& response) - { - ssize_t bytes = -1; - response.clear(); - - try - { - for (;;) - { - if (_start == _end) - { - bytes = IoUtil::readMessage(pipeReader, _buffer, sizeof(_buffer)); - if (bytes < 0) - { - _start = _end = nullptr; - break; - } - - _start = _buffer; - _end = _buffer + bytes; - } - - if (_start != _end) - { - char byteChar = *_start++; - while (_start != _end && byteChar != '\r' && byteChar != '\n') - { - response += byteChar; - byteChar = *_start++; - } - - if (byteChar == '\r' && *_start == '\n') - { - ++_start; - break; - } - } - } - } - catch (const std::exception& exc) - { - Log::error() << "Exception while reading from pipe [" - << pipeReader << "]: " << exc.what() << Log::end; - return -1; - } - - Log::debug("Recv child response: [" + response + "]."); - return bytes; - } - bool createSession(const Process::PID pid, const std::string& session, const std::string& url) { - const std::string message = "session " + session + " " + url + "\r\n"; + const std::string message = "session " + session + " " + url + "\n"; if (IoUtil::writeFIFO(getChildPipe(pid), message) < 0) { Log::error("Error sending session message to child [" + std::to_string(pid) + "]."); @@ -250,7 +199,7 @@ public: } std::string response; - if (getResponseLine(readerChild, response) < 0) + if (_childPipeReader.readLine(response, [](){ return TerminationFlag; }) < 0) { Log::error("Error reading response to session message from child [" + std::to_string(pid) + "]."); return false; @@ -268,10 +217,10 @@ public: size_t empty_count = 0; for (auto it = _childProcesses.begin(); it != _childProcesses.end(); ) { - const auto message = "query url \r\n"; + const auto message = "query url\n"; std::string response; if (IoUtil::writeFIFO(it->second->getWritePipe(), message) < 0 || - getResponseLine(readerChild, response) < 0) + _childPipeReader.readLine(response, [](){ return TerminationFlag; }) < 0) { auto log = Log::error(); log << "Error querying child [" << std::to_string(it->second->getPid()) << "]."; @@ -288,7 +237,7 @@ public: } StringTokenizer tokens(response, " ", StringTokenizer::TOK_IGNORE_EMPTY | StringTokenizer::TOK_TRIM); - if (tokens.count() == 2 && tokens[0] == std::to_string(it->second->getPid())) + if (tokens.count() >= 2 && tokens[0] == std::to_string(it->second->getPid())) { Log::debug("Child [" + std::to_string(it->second->getPid()) + "] hosts [" + tokens[1] + "]."); if (tokens[1] == "empty") @@ -304,7 +253,7 @@ public: else { Log::error("Unexpected response from child [" + std::to_string(it->second->getPid()) + - "] to query: [" + tokens[1] + "]."); + "] to url query: [" + response + "]."); } ++it; @@ -317,6 +266,8 @@ public: void handleInput(const std::string& message) { + Log::info("Broker command: [" + message + "]."); + StringTokenizer tokens(message, " ", StringTokenizer::TOK_IGNORE_EMPTY | StringTokenizer::TOK_TRIM); std::lock_guard<std::mutex> lock(forkMutex); @@ -388,9 +339,7 @@ public: } private: - char* _start; - char* _end; - char _buffer[READ_BUFFER_SIZE]; + IoUtil::PipeReader _childPipeReader; }; /// Initializes LibreOfficeKit for cross-fork re-use. diff --git a/loolwsd/LOOLKit.cpp b/loolwsd/LOOLKit.cpp index 299af4d..2bce3e8 100644 --- a/loolwsd/LOOLKit.cpp +++ b/loolwsd/LOOLKit.cpp @@ -997,15 +997,14 @@ void lokit_main(const std::string& childRoot, if (start != end) { char byteChar = *start++; - while (start != end && byteChar != '\r' && byteChar != '\n') + while (start != end && byteChar != '\n') { message += byteChar; byteChar = *start++; } - if (byteChar == '\r' && *start == '\n') + if (byteChar == '\n') { - start++; Log::trace("Recv: " + message); StringTokenizer tokens(message, " ", StringTokenizer::TOK_IGNORE_EMPTY | StringTokenizer::TOK_TRIM); auto response = std::to_string(Process::id()) + " "; @@ -1013,7 +1012,7 @@ void lokit_main(const std::string& childRoot, if (TerminationFlag) { // Too late, we're going down. - response += "down \r\n"; + response += "down\n"; } else if (tokens[0] == "session") { @@ -1034,29 +1033,29 @@ void lokit_main(const std::string& childRoot, if (url == document->getUrl() && document->createSession(sessionId, intSessionId)) { - response += "ok \r\n"; + response += "ok\n"; } else { - response += "bad \r\n"; + response += "bad\n"; } } else if (document && document->canDiscard()) { TerminationFlag = true; - response += "down \r\n"; + response += "down\n"; } else if (tokens[0] == "query" && tokens.count() > 1) { if (tokens[1] == "url") { response += (document ? document->getUrl() : "empty"); - response += " \r\n"; + response += "\n"; } } else { - response += "bad unknown token [" + tokens[0] + "] \r\n"; + response += "bad unknown token [" + tokens[0] + "]\n"; } IoUtil::writeFIFO(writerBroker, response); @@ -1064,7 +1063,6 @@ void lokit_main(const std::string& childRoot, // Don't log the CR LF at end assert(response.length() > 2); assert(response[response.length()-1] == '\n'); - assert(response[response.length()-2] == '\r'); Log::trace("KitToBroker: " + response.substr(0, response.length()-2)); message.clear(); } diff --git a/loolwsd/LOOLWSD.cpp b/loolwsd/LOOLWSD.cpp index 650b1e1..db673cc 100644 --- a/loolwsd/LOOLWSD.cpp +++ b/loolwsd/LOOLWSD.cpp @@ -272,6 +272,7 @@ private: session->handleInput(saveas.data(), saveas.size()); // Send it back to the client. + //TODO: Should have timeout to avoid waiting forever. Poco::URI resultURL(session->getSaveAs()); if (!resultURL.getPath().empty()) { @@ -459,8 +460,8 @@ private: sessionsLock.unlock(); // Request a kit process for this doc. - const std::string aMessage = "request " + id + " " + docKey + "\r\n"; - Log::debug("MasterToBroker: " + aMessage.substr(0, aMessage.length() - 2)); + const std::string aMessage = "request " + id + " " + docKey + "\n"; + Log::debug("MasterToBroker: " + aMessage.substr(0, aMessage.length() - 1)); IoUtil::writeFIFO(LOOLWSD::BrokerWritePipe, aMessage); // For ToClient sessions, we store incoming messages in a queue and have a separate @@ -495,8 +496,9 @@ private: if (docBroker->getSessionsCount() == 1 && !normalShutdown) { - //TODO: This really should move to the kit, where it - // knows if a doc is unsaved, and if other views are open. + //TODO: This isn't this simple. We need to wait for the notification + // of save so Storage can persist the save (if necessary). + // In addition, we shouldn't issue save when opening of the doc fails. Log::info("Non-deliberate shutdown of the last session, saving the document before tearing down."); queue.put("uno .uno:Save"); } @@ -1360,7 +1362,7 @@ int LOOLWSD::main(const std::vector<std::string>& /*args*/) threadPool.joinAll(); // Terminate child processes - IoUtil::writeFIFO(LOOLWSD::BrokerWritePipe, "eof\r\n"); + IoUtil::writeFIFO(LOOLWSD::BrokerWritePipe, "eof\n"); Log::info("Requesting child process " + std::to_string(brokerPid) + " to terminate"); Util::requestTermination(brokerPid); diff --git a/loolwsd/MasterProcessSession.cpp b/loolwsd/MasterProcessSession.cpp index f274a0e..90a82af 100644 --- a/loolwsd/MasterProcessSession.cpp +++ b/loolwsd/MasterProcessSession.cpp @@ -441,6 +441,7 @@ bool MasterProcessSession::loadDocument(const char* /*buffer*/, int /*length*/, // Finally, wait for the Child to connect to Master, // link the document in jail and dispatch load to child. + Log::trace("Dispatching child to handle [load]."); dispatchChild(); return true; @@ -463,7 +464,10 @@ bool MasterProcessSession::getStatus(const char *buffer, int length) } if (_peer.expired()) + { + Log::trace("Dispatching child to handle [getStatus]."); dispatchChild(); + } forwardToPeer(buffer, length); return true; } @@ -757,8 +761,8 @@ void MasterProcessSession::dispatchChild() { Log::info() << "Retrying child permission... " << retries << Log::end; // request again new URL session - const std::string message = "request " + getId() + " " + _docBroker->getDocKey() + "\r\n"; - Log::trace("MasterToBroker: " + message.substr(0, message.length()-2)); + const std::string message = "request " + getId() + " " + _docBroker->getDocKey() + '\n'; + Log::trace("MasterToBroker: " + message.substr(0, message.length()-1)); IoUtil::writeFIFO(LOOLWSD::BrokerWritePipe, message); } } _______________________________________________ Libreoffice-commits mailing list libreoffice-comm...@lists.freedesktop.org https://lists.freedesktop.org/mailman/listinfo/libreoffice-commits