loolwsd/LOOLBroker.cpp | 278 ++++++++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 274 insertions(+), 4 deletions(-)
New commits: commit 85de9ac86f0ca858439ad14492f1df1ec1e26a4e Author: Henry Castro <hcas...@collabora.com> Date: Sun Sep 27 18:04:12 2015 -0400 loolwsd: add pipe thread handler diff --git a/loolwsd/LOOLBroker.cpp b/loolwsd/LOOLBroker.cpp index 9097595..4737cc4 100644 --- a/loolwsd/LOOLBroker.cpp +++ b/loolwsd/LOOLBroker.cpp @@ -21,6 +21,7 @@ #include <cassert> #include <iostream> #include <fstream> +#include <deque> #include <Poco/Types.h> #include <Poco/Random.h> @@ -64,12 +65,17 @@ const std::string BROKER_PREFIX = "/tmp/lokit"; static int readerChild = -1; static int readerBroker = -1; +static int timeoutCounter = 0; static unsigned int forkCounter = 0; static unsigned int childCounter = 0; static std::mutex forkMutex; -static std::map<Poco::Process::PID, int> _childProcesses; +static std::deque<Process::PID> _emptyURL; +static std::map<Process::PID, int> _childProcesses; +static std::map<std::string, Process::PID> _cacheURL; + +Poco::NamedMutex _namedMutexLOOL("loolwsd"); namespace { @@ -232,6 +238,266 @@ namespace } } +class PipeRunnable: public Runnable +{ +public: + PipeRunnable() + { + _pStart = _pEnd = NULL; + } + + ssize_t getResponseLine(int nPipeReader, std::string& aLine) + { + ssize_t nBytes = -1; + aLine.clear(); + + while (true) + { + if ( _pStart == _pEnd ) + { + nBytes = Util::readMessage(nPipeReader, aBuffer, sizeof(aBuffer)); + if ( nBytes < 0 ) + { + _pStart = _pEnd = NULL; + break; + } + + _pStart = aBuffer; + _pEnd = aBuffer + nBytes; + } + + if ( _pStart != _pEnd ) + { + char aChar = *_pStart++; + while (_pStart != _pEnd && aChar != '\r' && aChar != '\n') + { + aLine += aChar; + aChar = *_pStart++; + } + + if ( aChar == '\r' && *_pStart == '\n') + { + _pStart++; + break; + } + } + } + + return nBytes; + } + + ssize_t sendMessage(int nPipeWriter, const std::string& aMessage) + { + ssize_t nBytes = -1; + + nBytes = Util::writeFIFO(nPipeWriter, aMessage.c_str(), aMessage.length()); + if ( nBytes < 0 ) + std::cout << Util::logPrefix() << "Error writting child: " << strerror(errno) << std::endl; + + return nBytes; + } + + ssize_t createThread(Process::PID nPID, const std::string& aTID) + { + std::string aResponse; + std::string aMessage = "thread " + aTID + "\r\n"; + return sendMessage(_childProcesses[nPID], aMessage); + } + + ssize_t updateURL(Process::PID nPID, const std::string& aURL) + { + std::string aResponse; + std::string aMessage = "url " + aURL + "\r\n"; + return sendMessage(_childProcesses[nPID], aMessage); + } + + Process::PID searchURL(const std::string& aURL) + { + ssize_t nBytes = -1; + Process::PID nPID = 0; + std::string aResponse; + std::string aMessage = "search " + aURL + "\r\n"; + + auto aIterator = _childProcesses.begin(); + for ( ; aIterator!=_childProcesses.end(); ++aIterator) + { + if ( !(aIterator->first > 0 && aIterator->second > 0) ) + { + //std::cout << Util::logPrefix() << "error iterator " << aIterator->second << " " << aMessage << std::endl; + continue; + } + + nBytes = Util::writeFIFO(aIterator->second, aMessage.c_str(), aMessage.length()); + if ( nBytes < 0 ) + { + std::cout << Util::logPrefix() << "Error writting child: " << aIterator->first << " " << strerror(errno) << std::endl; + break; + } + + nBytes = getResponseLine(readerChild, aResponse); + if ( nBytes < 0 ) + { + std::cout << Util::logPrefix() << "Error reading child: " << aIterator->first << " " << strerror(errno) << std::endl; + break; + } + + //std::cout << Util::logPrefix() << "response: " << aResponse << std::endl; + StringTokenizer tokens(aResponse, " ", StringTokenizer::TOK_IGNORE_EMPTY | StringTokenizer::TOK_TRIM); + if (tokens[1] == "ok") + { + nPID = aIterator->first; + } + else if (tokens[1] == "empty") + { + _emptyURL.push_back(aIterator->first); + } + } + + if ( aIterator != _childProcesses.end() ) + { + _cacheURL.clear(); + _emptyURL.clear(); + } + + return (nBytes > 0 ? nPID : -1); + } + + void handleInput(const std::string& aMessage) + { + Process::PID nPID; + + //std::cout << Util::logPrefix() << "Broker,Input," << aMessage << std::endl; + StringTokenizer tokens(aMessage, " ", StringTokenizer::TOK_IGNORE_EMPTY | StringTokenizer::TOK_TRIM); + if (tokens[0] == "request" && tokens.count() == 3) + { + std::string aTID = tokens[1]; + std::string aURL = tokens[2]; + + // check cache + auto aIterURL = _cacheURL.find(aURL); + if ( aIterURL != _cacheURL.end() ) + { + std::cout << Util::logPrefix() << "Cache Found: " << aIterURL->first << std::endl; + if (createThread(aIterURL->second, aTID) < 0) + std::cout << Util::logPrefix() << "Cache: Error creating thread: " << strerror(errno) << std::endl; + + return; + } + + // not found in cache, full search. + nPID = searchURL(aURL); + if ( nPID < 0) + return; + + if ( nPID > 0 ) + { + std::cout << Util::logPrefix() << "Search Found: " << nPID << std::endl; + if (createThread(nPID, aTID) < 0) + std::cout << Util::logPrefix() << "Search: Error creating thread: " << strerror(errno) << std::endl; + else + _cacheURL[aURL] = nPID; + + return; + } + + // not found, new URL session. + if ( _emptyURL.size() > 0 ) + { + auto aItem = _emptyURL.front(); + std::cout << Util::logPrefix() << "Not Found: " << aItem << std::endl; + if (updateURL(aItem, aURL) < 0) + { + std::cout << Util::logPrefix() << "New: Error update URL: " << strerror(errno) << std::endl; + return; + } + + if (createThread(aItem, aTID) < 0) + { + std::cout << Util::logPrefix() << "New: Error creating thread: " << strerror(errno) << std::endl; + return; + } + _emptyURL.pop_front(); + _cacheURL[aURL] = aItem; + } + + if (_emptyURL.size() == 0 ) + { + std::cout << Util::logPrefix() << "No available childs, fork new one" << std::endl; + forkCounter++; + } + } + } + + void run() override + { + std::string aMessage; + char aBuffer[1024*2]; + char* pStart; + char* pEnd; + + struct pollfd aPoll; + ssize_t nBytes = -1; + + aPoll.fd = readerBroker; + aPoll.events = POLLIN; + aPoll.revents = 0; + + pStart = aBuffer; + pEnd = aBuffer; + +#ifdef __linux + if (prctl(PR_SET_NAME, reinterpret_cast<unsigned long>("pipe_reader"), 0, 0, 0) != 0) + std::cout << Util::logPrefix() << "Cannot set thread name :" << strerror(errno) << std::endl; +#endif + + while (true) + { + if ( pStart == pEnd ) + { + (void)poll(&aPoll,1,-1); + + if( (aPoll.revents & POLLIN) != 0 ) + { + nBytes = Util::readFIFO(readerBroker, aBuffer, sizeof(aBuffer)); + if (nBytes < 0) + { + pStart = pEnd = NULL; + std::cout << Util::logPrefix() << "Error reading message :" << strerror(errno) << std::endl; + continue; + } + pStart = aBuffer; + pEnd = aBuffer + nBytes; + } + } + + if ( pStart != pEnd ) + { + char aChar = *pStart++; + while (pStart != pEnd && aChar != '\r' && aChar != '\n') + { + aMessage += aChar; + aChar = *pStart++; + } + + if ( aChar == '\r' && *pStart == '\n') + { + pStart++; + + forkMutex.lock(); + handleInput(aMessage); + aMessage.clear(); + forkMutex.unlock(); + } + } + } + } + +private: + char* _pStart; + char* _pEnd; + char aBuffer[1024]; +}; + /// Initializes LibreOfficeKit for cross-fork re-use. static bool globalPreinit(const std::string &loSubPath) { @@ -326,9 +592,6 @@ static int startupLibreOfficeKit(bool sharePages, int nLOKits, return pId; } -static int timeoutCounter = 0; -Poco::NamedMutex _namedMutexLOOL("loolwsd"); - // Broker process int main(int argc, char** argv) { @@ -498,6 +761,13 @@ int main(int argc, char** argv) exit(-1); } + PipeRunnable pipeHandler; + Poco::Thread aPipe; + + aPipe.start(pipeHandler); + + std::cout << Util::logPrefix() << "loolwsd ready!" << std::endl; + while (_childProcesses.size() > 0) { int status; _______________________________________________ Libreoffice-commits mailing list libreoffice-comm...@lists.freedesktop.org http://lists.freedesktop.org/mailman/listinfo/libreoffice-commits