loolwsd/Admin.cpp | 12 +--- loolwsd/IoUtil.cpp | 135 +++++++++++++++++++++++++++++++++---------------- loolwsd/IoUtil.hpp | 29 +++++++++- loolwsd/LOOLBroker.cpp | 11 +-- loolwsd/LOOLKit.cpp | 4 - 5 files changed, 128 insertions(+), 63 deletions(-)
New commits: commit ecce874315984f01f6baa6ebcdb2a347d291c3c4 Author: Ashod Nakashian <ashod.nakash...@collabora.co.uk> Date: Mon Mar 28 14:22:18 2016 -0400 loolwsd: new PipeReader class to poll and tokenize pipe messages Change-Id: I5676b313ca4c7e711ead04c1491fe36591a00531 Reviewed-on: https://gerrit.libreoffice.org/23644 Reviewed-by: Ashod Nakashian <ashnak...@gmail.com> Tested-by: Ashod Nakashian <ashnak...@gmail.com> diff --git a/loolwsd/Admin.cpp b/loolwsd/Admin.cpp index be9c93a..1bbfc22 100644 --- a/loolwsd/Admin.cpp +++ b/loolwsd/Admin.cpp @@ -481,12 +481,6 @@ void Admin::run() _cpuStatsTask = new CpuStats(this); _cpuStatsTimer.schedule(_cpuStatsTask, _cpuStatsTaskInterval, _cpuStatsTaskInterval); - // Start listening for data changes - struct pollfd pollPipeNotify; - pollPipeNotify.fd = NotifyPipe; - pollPipeNotify.events = POLLIN; - pollPipeNotify.revents = 0; - static const std::string thread_name = "admin_thread"; if (prctl(PR_SET_NAME, reinterpret_cast<unsigned long>(thread_name.c_str()), 0, 0, 0) != 0) @@ -494,8 +488,10 @@ void Admin::run() Log::info("Thread [" + thread_name + "] started."); - IoUtil::pollPipeForReading(pollPipeNotify, FIFO_NOTIFY, NotifyPipe, - [this](std::string& message) { return handleInput(message); } ); + // Start listening for data changes. + IoUtil::PipeReader pipeReader(FIFO_NOTIFY, NotifyPipe); + pipeReader.process([this](std::string& message) { handleInput(message); return true; }, + []() { return TerminationFlag; }); _memStatsTimer.cancel(); _cpuStatsTimer.cancel(); diff --git a/loolwsd/IoUtil.cpp b/loolwsd/IoUtil.cpp index 70e386c..b2bd61e 100644 --- a/loolwsd/IoUtil.cpp +++ b/loolwsd/IoUtil.cpp @@ -268,62 +268,113 @@ ssize_t readMessage(const int pipe, char* buffer, const ssize_t size, const size return -1; } -void pollPipeForReading(pollfd& pollPipe, const std::string& targetPipeName , const int& targetPipe, - std::function<void(std::string& message)> handler) +/// Reads a single line from a pipe. +/// Returns 0 for timeout, <0 for error, and >0 on success. +/// On success, line will contain the read message. +int PipeReader::readLine(std::string& line, + std::function<bool()> stopPredicate, + const size_t timeoutMs) { - std::string message; - char buffer[READ_BUFFER_SIZE]; - char* start = buffer; - char* end = buffer; - ssize_t bytes = -1; + const char *endOfLine = static_cast<const char *>(std::memchr(_data.data(), '\n', _data.size())); + if (endOfLine != nullptr) + { + // We have a line cached, return it. + line += std::string(_data.data(), endOfLine); + _data.erase(0, endOfLine - _data.data() + 1); // Including the '\n'. + return 1; + } - while (!TerminationFlag) + // Poll in short intervals to check for stop condition. + const auto pollTimeoutMs = 500; + auto maxPollCount = timeoutMs / pollTimeoutMs; + while (maxPollCount-- > 0) { - if (start == end) + if (stopPredicate()) + { + Log::info() << "Spot requested for pipe: " << _name << Log::end; + return -1; + } + + struct pollfd pipe; + pipe.fd = _pipe; + pipe.events = POLLIN; + pipe.revents = 0; + const int ready = poll(&pipe, 1, pollTimeoutMs); + if (ready == 0) + { + // Timeout. + continue; + } + else if (ready < 0) + { + // error. + return ready; + } + else if (pipe.revents & (POLLIN | POLLPRI)) { - if (poll(&pollPipe, 1, POLL_TIMEOUT_MS) < 0) + char buffer[READ_BUFFER_SIZE]; + const auto bytes = readFIFO(_pipe, buffer, sizeof(buffer)); + if (bytes < 0) { - Log::error("Failed to poll pipe [" + targetPipeName + "]."); - continue; + return -1; } - else if (pollPipe.revents & (POLLIN | POLLPRI)) + + const char *endOfLine = static_cast<const char *>(std::memchr(buffer, '\n', bytes)); + if (endOfLine != nullptr) { - bytes = readFIFO(targetPipe, buffer, sizeof(buffer)); - if (bytes < 0) - { - start = end = nullptr; - Log::error("Error reading message from pipe [" + targetPipeName + "]."); - continue; - } - start = buffer; - end = buffer + bytes; + // Got end of line. + line = _data; + auto tail = std::string(static_cast<const char*>(buffer), endOfLine); + line += tail; + _data = std::string(endOfLine, bytes - tail.size() - 1); // Exclude the '\n'. + return 1; } - else if (pollPipe.revents & (POLLERR | POLLHUP)) + else { - Log::error("Broken pipe [" + targetPipeName + "] with wsd."); - break; + // More data, keep going. + _data += std::string(buffer, bytes); } } - - if (start != end) + else if (pipe.revents & (POLLERR | POLLHUP | POLLNVAL)) { - char byteChar = *start++; - while (start != end && byteChar != '\r' && byteChar != '\n') - { - message += byteChar; - byteChar = *start++; - } + return -1; + } + } - if (byteChar == '\r' && *start == '\n') - { - start++; - Log::debug(targetPipeName + " recv: " + message); - if (message == "eof") - break; + // Timeout. + return 0; +} - handler(message); - message.clear(); - } +void PipeReader::process(std::function<bool(std::string& message)> handler, + std::function<bool()> stopPredicate, + const size_t pollTimeoutMs) +{ + bool stop = false; + for (;;) + { + stop = stopPredicate(); + if (stop) + { + Log::info("Termination flagged for pipe [" + _name + "]."); + break; + } + + std::string line; + const auto ready = readLine(line, stopPredicate, pollTimeoutMs); + if (ready == 0) + { + // Timeout. + continue; + } + else if (ready < 0) + { + Log::error("Error reading from pipe [" + _name + "]."); + continue; + } + else if (!handler(line)) + { + Log::info("Pipe [" + _name + "] handler requested to finish."); + break; } } } diff --git a/loolwsd/IoUtil.hpp b/loolwsd/IoUtil.hpp index 8a0f53d..c7a4e00 100644 --- a/loolwsd/IoUtil.hpp +++ b/loolwsd/IoUtil.hpp @@ -45,9 +45,32 @@ namespace IoUtil ssize_t readMessage(const int pipe, char* buffer, const ssize_t size, const size_t timeoutSec = CHILD_TIMEOUT_SECS); - void pollPipeForReading(pollfd& pollPipe, const std::string& targetPipeName , const int& targetPipe, - std::function<void(std::string& message)> handler); -}; + class PipeReader + { + public: + PipeReader(const std::string& name, const int pipe) : + _name(name), + _pipe(pipe) + { + } + + /// Reads a single line from the pipe. + /// Returns 0 for timeout, <0 for error, and >0 on success. + /// On success, line will contain the read message. + int readLine(std::string& line, + std::function<bool()> stopPredicate, + const size_t timeoutMs); + + void process(std::function<bool(std::string& message)> handler, + std::function<bool()> stopPredicate, + const size_t pollTimeoutMs = POLL_TIMEOUT_MS); + + private: + const std::string _name; + const int _pipe; + std::string _data; + }; +} #endif diff --git a/loolwsd/LOOLBroker.cpp b/loolwsd/LOOLBroker.cpp index eab8777..a551365 100644 --- a/loolwsd/LOOLBroker.cpp +++ b/loolwsd/LOOLBroker.cpp @@ -373,12 +373,6 @@ public: void run() override { - struct pollfd pollPipeBroker; - - pollPipeBroker.fd = readerBroker; - pollPipeBroker.events = POLLIN; - pollPipeBroker.revents = 0; - static const std::string thread_name = "brk_pipe_reader"; if (prctl(PR_SET_NAME, reinterpret_cast<unsigned long>(thread_name.c_str()), 0, 0, 0) != 0) @@ -386,8 +380,9 @@ public: Log::debug("Thread [" + thread_name + "] started."); - IoUtil::pollPipeForReading(pollPipeBroker, FIFO_LOOLWSD, readerBroker, - [this](std::string& message) { return handleInput(message); } ); + IoUtil::PipeReader pipeReader(FIFO_LOOLWSD, readerBroker); + pipeReader.process([this](std::string& message) { handleInput(message); return true; }, + []() { return TerminationFlag; }); Log::debug("Thread [" + thread_name + "] finished."); } diff --git a/loolwsd/LOOLKit.cpp b/loolwsd/LOOLKit.cpp index 8c9ef9f..299af4d 100644 --- a/loolwsd/LOOLKit.cpp +++ b/loolwsd/LOOLKit.cpp @@ -943,6 +943,8 @@ void lokit_main(const std::string& childRoot, Log::info("loolkit [" + std::to_string(Process::id()) + "] is ready."); + char buffer[READ_BUFFER_SIZE]; + std::string message; char* start = nullptr; char* end = nullptr; @@ -974,7 +976,6 @@ void lokit_main(const std::string& childRoot, else if (pollPipeBroker.revents & (POLLIN | POLLPRI)) { - char buffer[READ_BUFFER_SIZE]; const auto bytes = IoUtil::readFIFO(readerBroker, buffer, sizeof(buffer)); if (bytes < 0) { @@ -995,7 +996,6 @@ void lokit_main(const std::string& childRoot, if (start != end) { - std::string message; char byteChar = *start++; while (start != end && byteChar != '\r' && byteChar != '\n') { _______________________________________________ Libreoffice-commits mailing list libreoffice-comm...@lists.freedesktop.org https://lists.freedesktop.org/mailman/listinfo/libreoffice-commits