net/DelaySocket.cpp | 158 ++++++++++++++++++++++++++++------------------------ 1 file changed, 87 insertions(+), 71 deletions(-)
New commits: commit 7b24ecd447bf6f9d8066a9c1547aab6e6f193502 Author: Michael Meeks <michael.me...@collabora.com> Date: Sat Apr 22 18:37:38 2017 +0100 DelaySocket: hide logging. diff --git a/net/DelaySocket.cpp b/net/DelaySocket.cpp index d2b0f7db..29a7fc1d 100644 --- a/net/DelaySocket.cpp +++ b/net/DelaySocket.cpp @@ -11,6 +11,8 @@ #include "net/DelaySocket.hpp" +#define DELAY_LOG(X) std::cerr << X << "\n"; + class Delayer; // FIXME: TerminatingPoll ? @@ -79,8 +81,8 @@ public: int remainingMs = std::chrono::duration_cast<std::chrono::milliseconds>( (*_chunks.begin())->_sendTime - now).count(); if (remainingMs < timeoutMaxMs) - std::cerr << "#" << getFD() << " reset timeout max to " << remainingMs - << "ms from " << timeoutMaxMs << "ms\n"; + DELAY_LOG("#" << getFD() << " reset timeout max to " << remainingMs + << "ms from " << timeoutMaxMs << "ms\n"); timeoutMaxMs = std::min(timeoutMaxMs, remainingMs); } @@ -116,8 +118,7 @@ public: shutdown(); break; } - std::cerr << "#" << getFD() << " changed to state " - << newState << "\n"; + DELAY_LOG("#" << getFD() << " changed to state " << newState << "\n"); _state = newState; } @@ -138,8 +139,8 @@ public: changeState(EofFlushWrites); else if (len >= 0) { - std::cerr << "#" << getFD() << " read " << len - << " to queue: " << _chunks.size() << "\n"; + DELAY_LOG("#" << getFD() << " read " << len + << " to queue: " << _chunks.size() << "\n"); chunk->_data.insert(chunk->_data.end(), &buf[0], &buf[len]); if (_dest) _dest->_chunks.push_back(chunk); @@ -148,7 +149,7 @@ public: } else if (errno != EAGAIN && errno != EWOULDBLOCK) { - std::cerr << "#" << getFD() << " error : " << errno << " " << strerror(errno) << "\n"; + DELAY_LOG("#" << getFD() << " error : " << errno << " " << strerror(errno) << "\n"); changeState(Closed); // FIXME - propagate the error ? } } @@ -166,7 +167,7 @@ public: { if (chunk->_data.size() == 0) { // delayed error or close - std::cerr << "#" << getFD() << " handling delayed close\n"; + DELAY_LOG("#" << getFD() << " handling delayed close\n"); changeState(Closed); } else @@ -180,21 +181,23 @@ public: { if (errno == EAGAIN || errno == EWOULDBLOCK) { - std::cerr << "#" << getFD() << " full - waiting for write\n"; + DELAY_LOG("#" << getFD() << " full - waiting for write\n"); } else { - std::cerr << "#" << getFD() << " failed onwards write " << len << "bytes of " + DELAY_LOG("#" << getFD() << " failed onwards write " + << len << "bytes of " << chunk->_data.size() - << " queue: " << _chunks.size() << " error " << strerror(errno) << "\n"; + << " queue: " << _chunks.size() << " error " + << strerror(errno) << "\n"); changeState(Closed); } } else { - std::cerr << "#" << getFD() << " written onwards " << len << "bytes of " + DELAY_LOG("#" << getFD() << " written onwards " << len << "bytes of " << chunk->_data.size() - << " queue: " << _chunks.size() << "\n"; + << " queue: " << _chunks.size() << "\n"); if (len > 0) chunk->_data.erase(chunk->_data.begin(), chunk->_data.begin() + len); @@ -207,7 +210,7 @@ public: if (events & (POLLERR | POLLHUP | POLLNVAL)) { - std::cerr << "#" << getFD() << " error events: " << events << "\n"; + DELAY_LOG("#" << getFD() << " error events: " << events << "\n"); changeState(Closed); } commit efced88935d88c534bef9d7f6ae6ad7539d7b51d Author: Michael Meeks <michael.me...@collabora.com> Date: Sat Apr 22 18:30:37 2017 +0100 DelaySocket: working, and much cleaner / simpler. diff --git a/net/DelaySocket.cpp b/net/DelaySocket.cpp index a27c618f..d2b0f7db 100644 --- a/net/DelaySocket.cpp +++ b/net/DelaySocket.cpp @@ -19,10 +19,11 @@ static SocketPoll DelayPoll("delay_poll"); /// Reads from fd, delays that and then writes to _dest. class DelaySocket : public Socket { int _delayMs; - bool _closed; - bool _stopPoll; - bool _waitForWrite; - std::shared_ptr<DelaySocket> _dest; + enum State { ReadWrite, // normal socket + EofFlushWrites, // finish up writes and close + Closed }; + State _state; + std::shared_ptr<DelaySocket> _dest; // our writing twin. const size_t WindowSize = 64 * 1024; @@ -43,8 +44,8 @@ class DelaySocket : public Socket { std::vector<std::shared_ptr<WriteChunk>> _chunks; public: DelaySocket(int delayMs, int fd) : - Socket (fd), _delayMs(delayMs), _closed(false), - _stopPoll(false), _waitForWrite(false) + Socket (fd), _delayMs(delayMs), + _state(ReadWrite) { // setSocketBufferSize(Socket::DefaultSendBufferSize); } @@ -90,16 +91,39 @@ public: return POLLIN; } - void pushCloseChunk(bool bErrorSocket) + void pushCloseChunk() { - // socket in error state ? don't keep polling it. - _stopPoll |= bErrorSocket; _chunks.push_back(std::make_shared<WriteChunk>(_delayMs)); } + void changeState(State newState) + { + switch (newState) + { + case ReadWrite: + assert (false); + break; + case EofFlushWrites: + assert (_state == ReadWrite); + assert (_dest); + _dest->pushCloseChunk(); + _dest = nullptr; + break; + case Closed: + if (_dest && _state == ReadWrite) + _dest->pushCloseChunk(); + _dest = nullptr; + shutdown(); + break; + } + std::cerr << "#" << getFD() << " changed to state " + << newState << "\n"; + _state = newState; + } + HandleResult handlePoll(std::chrono::steady_clock::time_point now, int events) override { - if (events & POLLIN) + if (_state == ReadWrite && (events & POLLIN)) { auto chunk = std::make_shared<WriteChunk>(_delayMs); @@ -110,19 +134,9 @@ public: len = ::read(getFD(), buf, toRead); } while (len < 0 && errno == EINTR); - if (len == 0) - { // EOF. - if (_dest) // FIXME: cut and paste ... - { - _dest->pushCloseChunk(false); - _dest = nullptr; - } - std::cerr << "EOF on input\n"; - shutdown(); - return HandleResult::SOCKET_CLOSED; - } - - if (len >= 0) + if (len == 0) // EOF. + changeState(EofFlushWrites); + else if (len >= 0) { std::cerr << "#" << getFD() << " read " << len << " to queue: " << _chunks.size() << "\n"; @@ -130,17 +144,21 @@ public: if (_dest) _dest->_chunks.push_back(chunk); else - std::cerr << "no destination for data\n"; + assert("no destination for data" && false); } else if (errno != EAGAIN && errno != EWOULDBLOCK) { std::cerr << "#" << getFD() << " error : " << errno << " " << strerror(errno) << "\n"; - pushCloseChunk(true); + changeState(Closed); // FIXME - propagate the error ? } } - // Write if we have delayed enough. - if (_chunks.size() > 0) + if (_chunks.size() == 0) + { + if (_state == EofFlushWrites) + changeState(Closed); + } + else // Write if we have delayed enough. { std::shared_ptr<WriteChunk> chunk = *_chunks.begin(); if (std::chrono::duration_cast<std::chrono::milliseconds>( @@ -149,59 +167,54 @@ public: if (chunk->_data.size() == 0) { // delayed error or close std::cerr << "#" << getFD() << " handling delayed close\n"; - _closed = true; - _dest = nullptr; - shutdown(); - return HandleResult::SOCKET_CLOSED; + changeState(Closed); } - - ssize_t len; - do { - len = ::write(getFD(), &chunk->_data[0], chunk->_data.size()); - } while (len < 0 && errno == EINTR); - - if (len < 0) + else { - if (errno == EAGAIN || errno == EWOULDBLOCK) + ssize_t len; + do { + len = ::write(getFD(), &chunk->_data[0], chunk->_data.size()); + } while (len < 0 && errno == EINTR); + + if (len < 0) { - std::cerr << "#" << getFD() << " full - waiting for write\n"; + if (errno == EAGAIN || errno == EWOULDBLOCK) + { + std::cerr << "#" << getFD() << " full - waiting for write\n"; + } + else + { + std::cerr << "#" << getFD() << " failed onwards write " << len << "bytes of " + << chunk->_data.size() + << " queue: " << _chunks.size() << " error " << strerror(errno) << "\n"; + changeState(Closed); + } } else { - std::cerr << "#" << getFD() << " failed onwards write " << len << "bytes of " + std::cerr << "#" << getFD() << " written onwards " << len << "bytes of " << chunk->_data.size() - << " queue: " << _chunks.size() << " error " << strerror(errno) << "\n"; - // URGH - cut and paste ... - _closed = true; - _dest = nullptr; - // FIXME: tell dest we're dead ... [!] ... - shutdown(); - return HandleResult::SOCKET_CLOSED; - } - } - else - { - std::cerr << "#" << getFD() << " written onwards " << len << "bytes of " - << chunk->_data.size() - << " queue: " << _chunks.size() << "\n"; - if (len > 0) - { - chunk->_data.erase(chunk->_data.begin(), chunk->_data.begin() + len); - } + << " queue: " << _chunks.size() << "\n"; + if (len > 0) + chunk->_data.erase(chunk->_data.begin(), chunk->_data.begin() + len); - if (chunk->_data.size() == 0) - _chunks.erase(_chunks.begin(), _chunks.begin() + 1); + if (chunk->_data.size() == 0) + _chunks.erase(_chunks.begin(), _chunks.begin() + 1); + } } } } - // FIXME: ideally we could avoid polling & delay _closed state etc. if (events & (POLLERR | POLLHUP | POLLNVAL)) { std::cerr << "#" << getFD() << " error events: " << events << "\n"; - pushCloseChunk(true); + changeState(Closed); } - return HandleResult::CONTINUE; + + if (_state == Closed) + return HandleResult::SOCKET_CLOSED; + else + return HandleResult::CONTINUE; } }; _______________________________________________ Libreoffice-commits mailing list libreoffice-comm...@lists.freedesktop.org https://lists.freedesktop.org/mailman/listinfo/libreoffice-commits