loolwsd/Admin.cpp | 272 ++++++++++++++++++++++++++++++++++++++++ loolwsd/Admin.hpp | 243 +---------------------------------- loolwsd/AdminModel.hpp | 19 ++ loolwsd/ChildProcessSession.hpp | 2 loolwsd/LOOLBroker.cpp | 4 loolwsd/Makefile.am | 2 6 files changed, 308 insertions(+), 234 deletions(-)
New commits: commit 85c6467344710ef1717140c9d17fe6383b420746 Author: Pranav Kant <pran...@collabora.com> Date: Fri Feb 26 11:55:13 2016 +0530 loolwsd: Move Admin class to separate header As a test, add command to fetch documents from AdminModel. Change-Id: I3cb7097ba7dde049f3b2478fe7b6b6c309da1d92 Reviewed-on: https://gerrit.libreoffice.org/22781 Reviewed-by: Tor Lillqvist <t...@collabora.com> Tested-by: Tor Lillqvist <t...@collabora.com> diff --git a/loolwsd/Admin.cpp b/loolwsd/Admin.cpp new file mode 100644 index 0000000..e30d55e --- /dev/null +++ b/loolwsd/Admin.cpp @@ -0,0 +1,272 @@ +/* -*- Mode: C++; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4; fill-column: 100 -*- */ +/* + * This file is part of the LibreOffice project. + * + * This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. + */ + +#include <cassert> +#include <sys/poll.h> +#include <sys/prctl.h> + +#include <Poco/Net/WebSocket.h> +#include <Poco/Net/HTTPRequestHandler.h> +#include <Poco/Net/HTTPServerParams.h> +#include <Poco/Net/HTTPServerRequest.h> +#include <Poco/Net/HTTPServerResponse.h> +#include <Poco/Net/NetException.h> +#include <Poco/StringTokenizer.h> + +#include "Admin.hpp" +#include "AdminModel.hpp" +#include "Common.hpp" +#include "LOOLProtocol.hpp" +#include "Util.hpp" + +using namespace LOOLProtocol; + +using Poco::Net::HTTPRequestHandler; +using Poco::Net::HTTPRequestHandlerFactory; +using Poco::Net::HTTPResponse; +using Poco::Net::HTTPServerParams; +using Poco::Net::HTTPServerRequest; +using Poco::Net::HTTPServerResponse; +using Poco::Net::ServerSocket; +using Poco::Net::WebSocket; +using Poco::Net::WebSocketException; +using Poco::StringTokenizer; +using Poco::Net::Socket; + +/// Handle admin requests. +class AdminRequestHandler: public HTTPRequestHandler +{ +public: + + AdminRequestHandler(Admin* adminManager) + : _admin(adminManager) + { } + + void handleRequest(HTTPServerRequest& request, HTTPServerResponse& response) override + { + assert(request.serverAddress().port() == ADMIN_PORT_NUMBER); + + const std::string thread_name = "admin_ws"; + try + { + if (prctl(PR_SET_NAME, reinterpret_cast<unsigned long>(thread_name.c_str()), 0, 0, 0) != 0) + Log::error("Cannot set thread name to " + thread_name + "."); + + Log::debug("Thread [" + thread_name + "] started."); + + auto ws = std::make_shared<WebSocket>(request, response); + const Poco::Timespan waitTime(POLL_TIMEOUT_MS * 1000); + int flags = 0; + int n = 0; + ws->setReceiveTimeout(0); + do + { + char buffer[200000]; //FIXME: Dynamic? + + if (ws->poll(waitTime, Socket::SELECT_READ)) + { + n = ws->receiveFrame(buffer, sizeof(buffer), flags); + + if ((flags & WebSocket::FRAME_OP_BITMASK) == WebSocket::FRAME_OP_PING) + { + // Echo back the ping payload as pong. + // Technically, we should send back a PONG control frame. + // However Firefox (probably) or Node.js (possibly) doesn't + // like that and closes the socket when we do. + // Echoing the payload as a normal frame works with Firefox. + ws->sendFrame(buffer, n /*, WebSocket::FRAME_OP_PONG*/); + } + else if ((flags & WebSocket::FRAME_OP_BITMASK) == WebSocket::FRAME_OP_PONG) + { + // In case we do send pings in the future. + } + else if (n <= 0) + { + // Connection closed. + Log::warn() << "Received " << n + << " bytes. Connection closed. Flags: " + << std::hex << flags << Log::end; + break; + } + else + { + assert(n > 0); + const std::string firstLine = getFirstLine(buffer, n); + StringTokenizer tokens(firstLine, " ", StringTokenizer::TOK_IGNORE_EMPTY | StringTokenizer::TOK_TRIM); + Log::trace() << "Recv: " << firstLine << Log::end; + + if (firstLine == "eof") + { + Log::info("Received EOF. Finishing."); + break; + } + + if (tokens.count() == 1 && tokens[0] == "stats") + { + //TODO: Collect stats and reply back to admin. + // We need to ask Broker to give us some numbers on docs/clients/etc. + // But we can also collect some memory info using system calls. + + std::string statsResponse; + + const auto cmd = "pstree -a -c -h -A -p " + std::to_string(getpid()); + FILE* fp = popen(cmd.c_str(), "r"); + if (fp == nullptr) + { + statsResponse = "error: failed to collect stats."; + ws->sendFrame(statsResponse.data(), statsResponse.size()); + continue; + } + + char treeBuffer[1024]; + while (fgets(treeBuffer, sizeof(treeBuffer)-1, fp) != nullptr) + { + statsResponse += treeBuffer; + statsResponse += "</ BR>\n"; + } + + pclose(fp); + + ws->sendFrame(statsResponse.data(), statsResponse.size()); + } + else if (tokens.count() == 1 && tokens[0] == "documents") + { + + std::string response = "documents " + _admin->getDocuments(); + ws->sendFrame(response.data(), response.size()); + } + } + } + } + while (!TerminationFlag && + (flags & WebSocket::FRAME_OP_BITMASK) != WebSocket::FRAME_OP_CLOSE); + Log::debug() << "Finishing AdminProcessor. TerminationFlag: " << TerminationFlag + << ", payload size: " << n + << ", flags: " << std::hex << flags << Log::end; + } + catch (const WebSocketException& exc) + { + Log::error("AdminRequestHandler::handleRequest(), WebSocketException: " + exc.message()); + switch (exc.code()) + { + case WebSocket::WS_ERR_HANDSHAKE_UNSUPPORTED_VERSION: + response.set("Sec-WebSocket-Version", WebSocket::WEBSOCKET_VERSION); + // fallthrough + case WebSocket::WS_ERR_NO_HANDSHAKE: + case WebSocket::WS_ERR_HANDSHAKE_NO_VERSION: + case WebSocket::WS_ERR_HANDSHAKE_NO_KEY: + response.setStatusAndReason(HTTPResponse::HTTP_BAD_REQUEST); + response.setContentLength(0); + response.send(); + break; + } + } + catch (const std::exception& exc) + { + Log::error(std::string("Exception: ") + exc.what()); + } + + Log::debug("Thread [" + thread_name + "] finished."); + } + +private: + Admin* _admin; +}; + +//TODO: Move to common header. +class AdminRequestHandlerFactory: public HTTPRequestHandlerFactory +{ +public: + AdminRequestHandlerFactory(Admin* adminManager) + : _admin(adminManager) + {} + + HTTPRequestHandler* createRequestHandler(const HTTPServerRequest& request) override + { + auto logger = Log::info(); + logger << "Request from " << request.clientAddress().toString() << ": " + << request.getMethod() << " " << request.getURI() << " " + << request.getVersion(); + + for (HTTPServerRequest::ConstIterator it = request.begin(); it != request.end(); ++it) + { + logger << " / " << it->first << ": " << it->second; + } + + logger << Log::end; + return new AdminRequestHandler(_admin); + } + +private: + Admin* _admin; +}; + +/// An admin command processor. +Admin::Admin(const int brokerPipe, const int notifyPipe) : + _srv(new AdminRequestHandlerFactory(this), ServerSocket(ADMIN_PORT_NUMBER), new HTTPServerParams), + _model(AdminModel()) +{ + Admin::BrokerPipe = brokerPipe; + Admin::NotifyPipe = notifyPipe; +} + +Admin::~Admin() +{ + Log::info("~Admin dtor."); + _srv.stop(); +} + +std::string Admin::getDocuments() +{ + return _model.getDocuments(); +} + +void Admin::handleInput(std::string& message) +{ + StringTokenizer tokens(message, " ", StringTokenizer::TOK_IGNORE_EMPTY | StringTokenizer::TOK_TRIM); + if (tokens.count() > 2 && tokens[0] == "document") + { + std::string pid = tokens[1]; + std::string url = tokens[2]; + + _model.addDocument(std::stoi(pid), url); + } +} + +void Admin::run() +{ + Log::info("Listening on Admin port " + std::to_string(ADMIN_PORT_NUMBER)); + + // Start a server listening on the admin port. + _srv.start(); + + // Start listening for data changes + struct pollfd pollPipeNotify; + pollPipeNotify.fd = NotifyPipe; + pollPipeNotify.events = POLLIN; + pollPipeNotify.revents = 0; + + static const std::string thread_name = "admin_thread"; + + if (prctl(PR_SET_NAME, reinterpret_cast<unsigned long>(thread_name.c_str()), 0, 0, 0) != 0) + Log::error("Cannot set thread name to " + thread_name + "."); + + Log::info("Thread [" + thread_name + "] started."); + + Util::pollPipeForReading(pollPipeNotify, FIFO_NOTIFY, NotifyPipe, + [this](std::string& message) { return handleInput(message); } ); + + Log::debug("Thread [" + thread_name + "] finished."); +} + +//TODO: Clean up with something more elegant. +int Admin::BrokerPipe; +int Admin::NotifyPipe; + +/* vim:set shiftwidth=4 softtabstop=4 expandtab: */ diff --git a/loolwsd/Admin.hpp b/loolwsd/Admin.hpp index 1216505..0356213 100644 --- a/loolwsd/Admin.hpp +++ b/loolwsd/Admin.hpp @@ -10,262 +10,39 @@ #ifndef INCLUDED_ADMIN_HPP #define INCLUDED_ADMIN_HPP -#include <cassert> -#include <condition_variable> -#include <map> -#include <memory> -#include <mutex> -#include <ostream> -#include <set> -#include <sys/poll.h> - -#include <Poco/Net/WebSocket.h> -#include <Poco/Buffer.h> -#include <Poco/Path.h> -#include <Poco/StringTokenizer.h> -#include <Poco/Types.h> #include <Poco/Net/HTTPServer.h> -#include <Poco/Net/HTTPServerParams.h> -#include <Poco/Net/HTTPServerParams.h> -#include <Poco/Net/HTTPServerRequest.h> -#include <Poco/Net/HTTPServerResponse.h> +#include <Poco/Runnable.h> +#include <Poco/Types.h> #include "AdminModel.hpp" -#include "Common.hpp" -#include "LOOLProtocol.hpp" -#include "Util.hpp" - -using namespace LOOLProtocol; - -using Poco::Exception; -using Poco::File; -using Poco::Net::HTTPRequest; -using Poco::Net::HTTPRequestHandler; -using Poco::Net::HTTPRequestHandlerFactory; -using Poco::Net::HTTPResponse; -using Poco::Net::HTTPServer; -using Poco::Net::HTTPServerParams; -using Poco::Net::HTTPServerRequest; -using Poco::Net::HTTPServerResponse; -using Poco::Net::ServerSocket; -using Poco::Net::WebSocket; -using Poco::Net::WebSocketException; -using Poco::Path; -using Poco::Runnable; -using Poco::StringTokenizer; -using Poco::Net::Socket; const std::string FIFO_NOTIFY = "loolnotify.fifo"; -/// Handle admin requests. -class AdminRequestHandler: public HTTPRequestHandler -{ -public: - - void handleRequest(HTTPServerRequest& request, HTTPServerResponse& response) override - { - assert(request.serverAddress().port() == ADMIN_PORT_NUMBER); - - const std::string thread_name = "admin_ws"; - try - { - if (prctl(PR_SET_NAME, reinterpret_cast<unsigned long>(thread_name.c_str()), 0, 0, 0) != 0) - Log::error("Cannot set thread name to " + thread_name + "."); - - Log::debug("Thread [" + thread_name + "] started."); - - auto ws = std::make_shared<WebSocket>(request, response); - const Poco::Timespan waitTime(POLL_TIMEOUT_MS * 1000); - int flags = 0; - int n = 0; - ws->setReceiveTimeout(0); - do - { - char buffer[200000]; //FIXME: Dynamic? - - if (ws->poll(waitTime, Socket::SELECT_READ)) - { - n = ws->receiveFrame(buffer, sizeof(buffer), flags); - - if ((flags & WebSocket::FRAME_OP_BITMASK) == WebSocket::FRAME_OP_PING) - { - // Echo back the ping payload as pong. - // Technically, we should send back a PONG control frame. - // However Firefox (probably) or Node.js (possibly) doesn't - // like that and closes the socket when we do. - // Echoing the payload as a normal frame works with Firefox. - ws->sendFrame(buffer, n /*, WebSocket::FRAME_OP_PONG*/); - } - else if ((flags & WebSocket::FRAME_OP_BITMASK) == WebSocket::FRAME_OP_PONG) - { - // In case we do send pings in the future. - } - else if (n <= 0) - { - // Connection closed. - Log::warn() << "Received " << n - << " bytes. Connection closed. Flags: " - << std::hex << flags << Log::end; - break; - } - else - { - assert(n > 0); - const std::string firstLine = getFirstLine(buffer, n); - StringTokenizer tokens(firstLine, " ", StringTokenizer::TOK_IGNORE_EMPTY | StringTokenizer::TOK_TRIM); - - if (firstLine == "eof") - { - Log::info("Received EOF. Finishing."); - break; - } - - if (tokens.count() == 1 && tokens[0] == "stats") - { - //TODO: Collect stats and reply back to admin. - // We need to ask Broker to give us some numbers on docs/clients/etc. - // But we can also collect some memory info using system calls. - - std::string statsResponse; - - const auto cmd = "pstree -a -c -h -A -p " + std::to_string(getpid()); - FILE* fp = popen(cmd.c_str(), "r"); - if (fp == nullptr) - { - statsResponse = "error: failed to collect stats."; - ws->sendFrame(statsResponse.data(), statsResponse.size()); - continue; - } - - char treeBuffer[1024]; - while (fgets(treeBuffer, sizeof(treeBuffer)-1, fp) != nullptr) - { - statsResponse += treeBuffer; - statsResponse += "</ BR>\n"; - } - - pclose(fp); - - ws->sendFrame(statsResponse.data(), statsResponse.size()); - } - } - } - } - while (!TerminationFlag && - (flags & WebSocket::FRAME_OP_BITMASK) != WebSocket::FRAME_OP_CLOSE); - Log::debug() << "Finishing AdminProcessor. TerminationFlag: " << TerminationFlag - << ", payload size: " << n - << ", flags: " << std::hex << flags << Log::end; - } - catch (const WebSocketException& exc) - { - Log::error("AdminRequestHandler::handleRequest(), WebSocketException: " + exc.message()); - switch (exc.code()) - { - case WebSocket::WS_ERR_HANDSHAKE_UNSUPPORTED_VERSION: - response.set("Sec-WebSocket-Version", WebSocket::WEBSOCKET_VERSION); - // fallthrough - case WebSocket::WS_ERR_NO_HANDSHAKE: - case WebSocket::WS_ERR_HANDSHAKE_NO_VERSION: - case WebSocket::WS_ERR_HANDSHAKE_NO_KEY: - response.setStatusAndReason(HTTPResponse::HTTP_BAD_REQUEST); - response.setContentLength(0); - response.send(); - break; - } - } - catch (const std::exception& exc) - { - Log::error(std::string("Exception: ") + exc.what()); - } - - Log::debug("Thread [" + thread_name + "] finished."); - } -}; - -//TODO: Move to common header. -class AdminRequestHandlerFactory: public HTTPRequestHandlerFactory -{ -public: - HTTPRequestHandler* createRequestHandler(const HTTPServerRequest& request) override - { - auto logger = Log::info(); - logger << "Request from " << request.clientAddress().toString() << ": " - << request.getMethod() << " " << request.getURI() << " " - << request.getVersion(); - - for (HTTPServerRequest::ConstIterator it = request.begin(); it != request.end(); ++it) - { - logger << " / " << it->first << ": " << it->second; - } - - logger << Log::end; - return new AdminRequestHandler(); - } -}; - /// An admin command processor. -class Admin : public Runnable +class Admin : public Poco::Runnable { public: - Admin(const int brokerPipe, const int notifyPipe) : - _srv(new AdminRequestHandlerFactory(), ServerSocket(ADMIN_PORT_NUMBER), new HTTPServerParams), - _model(AdminModel()) - { - Admin::BrokerPipe = brokerPipe; - Admin::NotifyPipe = notifyPipe; - } + Admin(const int brokerPipe, const int notifyPipe); - ~Admin() - { - Log::info("~Admin dtor."); - _srv.stop(); - } + ~Admin(); static int getBrokerPid() { return Admin::BrokerPipe; } - void handleInput(std::string& message) - { - std::cout << message << std::endl; - } - - void run() override - { - Log::info("Listening on Admin port " + std::to_string(ADMIN_PORT_NUMBER)); - - // Start a server listening on the admin port. - _srv.start(); - - struct pollfd pollPipeNotify; - - pollPipeNotify.fd = NotifyPipe; - pollPipeNotify.events = POLLIN; - pollPipeNotify.revents = 0; + std::string getDocuments(); - static const std::string thread_name = "admin_thread"; + void run() override; - if (prctl(PR_SET_NAME, reinterpret_cast<unsigned long>(thread_name.c_str()), 0, 0, 0) != 0) - Log::error("Cannot set thread name to " + thread_name + "."); - - Log::info("Thread [" + thread_name + "] started."); - - Util::pollPipeForReading(pollPipeNotify, FIFO_NOTIFY, NotifyPipe, - [this](std::string& message) { return handleInput(message); } ); - - Log::debug("Thread [" + thread_name + "] finished."); - } +private: + void handleInput(std::string& message); private: - HTTPServer _srv; + Poco::Net::HTTPServer _srv; AdminModel _model; static int BrokerPipe; static int NotifyPipe; }; -//TODO: Clean up with something more elegant. -int Admin::BrokerPipe; -int Admin::NotifyPipe; #endif /* vim:set shiftwidth=4 softtabstop=4 expandtab: */ diff --git a/loolwsd/AdminModel.hpp b/loolwsd/AdminModel.hpp index 438666f..dd7cea3 100644 --- a/loolwsd/AdminModel.hpp +++ b/loolwsd/AdminModel.hpp @@ -26,6 +26,25 @@ public: { Log::info("AdminModel dtor."); } + + void addDocument(Poco::Process::PID pid, std::string url) + { + _documents[pid] = url; + } + + std::string getDocuments() + { + std::string response; + for (const auto& it: _documents) + { + response += std::to_string(it.first) + " " + it.second + " <BR/>"; + } + + return response; + } + +private: + std::map<Poco::Process::PID, std::string> _documents; }; #endif diff --git a/loolwsd/ChildProcessSession.hpp b/loolwsd/ChildProcessSession.hpp index b5e3511..0519fd8 100644 --- a/loolwsd/ChildProcessSession.hpp +++ b/loolwsd/ChildProcessSession.hpp @@ -18,6 +18,8 @@ #include <Poco/Thread.h> #include <Poco/NotificationQueue.h> + +#include "Common.hpp" #include "LOOLSession.hpp" // The client port number, which is changed via loolwsd args. diff --git a/loolwsd/LOOLBroker.cpp b/loolwsd/LOOLBroker.cpp index 2e759cf..d1ab001 100644 --- a/loolwsd/LOOLBroker.cpp +++ b/loolwsd/LOOLBroker.cpp @@ -249,6 +249,10 @@ public: } StringTokenizer tokens(response, " ", StringTokenizer::TOK_IGNORE_EMPTY | StringTokenizer::TOK_TRIM); + if (tokens.count() > 1 && tokens[1] == "ok") + { + Util::writeFIFO(writerNotify, "document " + std::to_string(pid) + " " + url + " \r\n"); + } return (tokens.count() == 2 && tokens[0] == std::to_string(pid) && tokens[1] == "ok"); } diff --git a/loolwsd/Makefile.am b/loolwsd/Makefile.am index cbd6f02..abf3b7b 100644 --- a/loolwsd/Makefile.am +++ b/loolwsd/Makefile.am @@ -9,7 +9,7 @@ AM_LDFLAGS = -pthread shared_sources = LOOLProtocol.cpp LOOLSession.cpp MessageQueue.cpp Util.cpp -loolwsd_SOURCES = LOOLWSD.cpp ChildProcessSession.cpp MasterProcessSession.cpp TileCache.cpp $(shared_sources) +loolwsd_SOURCES = LOOLWSD.cpp ChildProcessSession.cpp MasterProcessSession.cpp TileCache.cpp Admin.cpp $(shared_sources) noinst_PROGRAMS = loadtest connect lokitclient _______________________________________________ Libreoffice-commits mailing list libreoffice-comm...@lists.freedesktop.org https://lists.freedesktop.org/mailman/listinfo/libreoffice-commits