This is an automated email from the ASF dual-hosted git repository. jiashunzhu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-brpc.git
The following commit(s) were added to refs/heads/master by this push: new 93038edc Restruct event_dispatcher source file (#1888) 93038edc is described below commit 93038edc7f1e02ec8646e7a2ed6cb7bec135bbfe Author: 果冻虾仁 <guod...@apache.org> AuthorDate: Thu Aug 18 03:34:00 2022 +0800 Restruct event_dispatcher source file (#1888) Split event_dispatcher into separate operating system specific files --- BUILD.bazel | 6 +- CMakeLists.txt | 5 + Makefile | 3 +- src/brpc/event_dispatcher.cpp | 323 +-------------------- src/brpc/event_dispatcher.h | 3 - ...t_dispatcher.cpp => event_dispatcher_epoll.cpp} | 147 +--------- ..._dispatcher.cpp => event_dispatcher_kqueue.cpp} | 179 +----------- 7 files changed, 35 insertions(+), 631 deletions(-) diff --git a/BUILD.bazel b/BUILD.bazel index 7592a186..0cb60fa4 100644 --- a/BUILD.bazel +++ b/BUILD.bazel @@ -495,6 +495,8 @@ cc_library( "src/brpc/thrift_service.cpp", "src/brpc/thrift_message.cpp", "src/brpc/policy/thrift_protocol.cpp", + "src/brpc/event_dispatcher_epoll.cpp", + "src/brpc/event_dispatcher_kqueue.cpp", ]) + select({ ":with_thrift" : glob([ "src/brpc/thrift*.cpp", @@ -503,7 +505,9 @@ cc_library( }), hdrs = glob([ "src/brpc/*.h", - "src/brpc/**/*.h" + "src/brpc/**/*.h", + "src/brpc/event_dispatcher_epoll.cpp", + "src/brpc/event_dispatcher_kqueue.cpp", ]), includes = [ "src/", diff --git a/CMakeLists.txt b/CMakeLists.txt index 5f46dc0e..713a090e 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -392,6 +392,7 @@ file(GLOB_RECURSE BTHREAD_SOURCES "${PROJECT_SOURCE_DIR}/src/bthread/*.cpp") file(GLOB_RECURSE JSON2PB_SOURCES "${PROJECT_SOURCE_DIR}/src/json2pb/*.cpp") file(GLOB_RECURSE BRPC_SOURCES "${PROJECT_SOURCE_DIR}/src/brpc/*.cpp") file(GLOB_RECURSE THRIFT_SOURCES "${PROJECT_SOURCE_DIR}/src/brpc/thrift*.cpp") +file(GLOB_RECURSE EXCLUDE_SOURCES "${PROJECT_SOURCE_DIR}/src/brpc/event_dispatcher_*.cpp") if(WITH_THRIFT) message("brpc compile with thrift protocol") @@ -403,6 +404,10 @@ else() set(THRIFT_SOURCES "") endif() +foreach(v ${EXCLUDE_SOURCES}) + list(REMOVE_ITEM BRPC_SOURCES ${v}) +endforeach() + set(MCPACK2PB_SOURCES ${PROJECT_SOURCE_DIR}/src/mcpack2pb/field_type.cpp ${PROJECT_SOURCE_DIR}/src/mcpack2pb/mcpack2pb.cpp diff --git a/Makefile b/Makefile index a6d2fed4..f4ea71b4 100644 --- a/Makefile +++ b/Makefile @@ -196,8 +196,9 @@ JSON2PB_OBJS = $(addsuffix .o, $(basename $(JSON2PB_SOURCES))) BRPC_DIRS = src/brpc src/brpc/details src/brpc/builtin src/brpc/policy THRIFT_SOURCES = $(foreach d,$(BRPC_DIRS),$(wildcard $(addprefix $(d)/thrift*,$(SRCEXTS)))) +EXCLUDE_SOURCES = $(foreach d,$(BRPC_DIRS),$(wildcard $(addprefix $(d)/event_dispatcher_*,$(SRCEXTS)))) BRPC_SOURCES_ALL = $(foreach d,$(BRPC_DIRS),$(wildcard $(addprefix $(d)/*,$(SRCEXTS)))) -BRPC_SOURCES = $(filter-out $(THRIFT_SOURCES), $(BRPC_SOURCES_ALL)) +BRPC_SOURCES = $(filter-out $(THRIFT_SOURCES) $(EXCLUDE_SOURCES), $(BRPC_SOURCES_ALL)) BRPC_PROTOS = $(filter %.proto,$(BRPC_SOURCES)) BRPC_CFAMILIES = $(filter-out %.proto %.pb.cc,$(BRPC_SOURCES)) BRPC_OBJS = $(BRPC_PROTOS:.proto=.pb.o) $(addsuffix .o, $(basename $(BRPC_CFAMILIES))) diff --git a/src/brpc/event_dispatcher.cpp b/src/brpc/event_dispatcher.cpp index 56b48a76..e6209286 100644 --- a/src/brpc/event_dispatcher.cpp +++ b/src/brpc/event_dispatcher.cpp @@ -23,15 +23,7 @@ #include "butil/third_party/murmurhash3/murmurhash3.h"// fmix32 #include "bthread/bthread.h" // bthread_start_background #include "brpc/event_dispatcher.h" -#ifdef BRPC_SOCKET_HAS_EOF -#include "brpc/details/has_epollrdhup.h" -#endif #include "brpc/reloadable_flags.h" -#if defined(OS_MACOSX) -#include <sys/types.h> -#include <sys/event.h> -#include <sys/time.h> -#endif namespace brpc { @@ -40,313 +32,6 @@ DEFINE_int32(event_dispatcher_num, 1, "Number of event dispatcher"); DEFINE_bool(usercode_in_pthread, false, "Call user's callback in pthreads, use bthreads otherwise"); -EventDispatcher::EventDispatcher() - : _epfd(-1) - , _stop(false) - , _tid(0) - , _consumer_thread_attr(BTHREAD_ATTR_NORMAL) -{ -#if defined(OS_LINUX) - _epfd = epoll_create(1024 * 1024); - if (_epfd < 0) { - PLOG(FATAL) << "Fail to create epoll"; - return; - } -#elif defined(OS_MACOSX) - _epfd = kqueue(); - if (_epfd < 0) { - PLOG(FATAL) << "Fail to create kqueue"; - return; - } -#else - #error Not implemented -#endif - CHECK_EQ(0, butil::make_close_on_exec(_epfd)); - - _wakeup_fds[0] = -1; - _wakeup_fds[1] = -1; - if (pipe(_wakeup_fds) != 0) { - PLOG(FATAL) << "Fail to create pipe"; - return; - } -} - -EventDispatcher::~EventDispatcher() { - Stop(); - Join(); - if (_epfd >= 0) { - close(_epfd); - _epfd = -1; - } - if (_wakeup_fds[0] > 0) { - close(_wakeup_fds[0]); - close(_wakeup_fds[1]); - } -} - -int EventDispatcher::Start(const bthread_attr_t* consumer_thread_attr) { - if (_epfd < 0) { -#if defined(OS_LINUX) - LOG(FATAL) << "epoll was not created"; -#elif defined(OS_MACOSX) - LOG(FATAL) << "kqueue was not created"; -#endif - return -1; - } - - if (_tid != 0) { - LOG(FATAL) << "Already started this dispatcher(" << this - << ") in bthread=" << _tid; - return -1; - } - - // Set _consumer_thread_attr before creating epoll/kqueue thread to make sure - // everyting seems sane to the thread. - _consumer_thread_attr = (consumer_thread_attr ? - *consumer_thread_attr : BTHREAD_ATTR_NORMAL); - - //_consumer_thread_attr is used in StartInputEvent(), assign flag NEVER_QUIT to it will cause new bthread - // that created by epoll_wait() never to quit. - _epoll_thread_attr = _consumer_thread_attr | BTHREAD_NEVER_QUIT; - - // Polling thread uses the same attr for consumer threads (NORMAL right - // now). Previously, we used small stack (32KB) which may be overflowed - // when the older comlog (e.g. 3.1.85) calls com_openlog_r(). Since this - // is also a potential issue for consumer threads, using the same attr - // should be a reasonable solution. - int rc = bthread_start_background( - &_tid, &_epoll_thread_attr, RunThis, this); - if (rc) { - LOG(FATAL) << "Fail to create epoll/kqueue thread: " << berror(rc); - return -1; - } - return 0; -} - -bool EventDispatcher::Running() const { - return !_stop && _epfd >= 0 && _tid != 0; -} - -void EventDispatcher::Stop() { - _stop = true; - - if (_epfd >= 0) { -#if defined(OS_LINUX) - epoll_event evt = { EPOLLOUT, { NULL } }; - epoll_ctl(_epfd, EPOLL_CTL_ADD, _wakeup_fds[1], &evt); -#elif defined(OS_MACOSX) - struct kevent kqueue_event; - EV_SET(&kqueue_event, _wakeup_fds[1], EVFILT_WRITE, EV_ADD | EV_ENABLE, - 0, 0, NULL); - kevent(_epfd, &kqueue_event, 1, NULL, 0, NULL); -#endif - } -} - -void EventDispatcher::Join() { - if (_tid) { - bthread_join(_tid, NULL); - _tid = 0; - } -} - -int EventDispatcher::AddEpollOut(SocketId socket_id, int fd, bool pollin) { - if (_epfd < 0) { - errno = EINVAL; - return -1; - } - -#if defined(OS_LINUX) - epoll_event evt; - evt.data.u64 = socket_id; - evt.events = EPOLLOUT | EPOLLET; -#ifdef BRPC_SOCKET_HAS_EOF - evt.events |= has_epollrdhup; -#endif - if (pollin) { - evt.events |= EPOLLIN; - if (epoll_ctl(_epfd, EPOLL_CTL_MOD, fd, &evt) < 0) { - // This fd has been removed from epoll via `RemoveConsumer', - // in which case errno will be ENOENT - return -1; - } - } else { - if (epoll_ctl(_epfd, EPOLL_CTL_ADD, fd, &evt) < 0) { - return -1; - } - } -#elif defined(OS_MACOSX) - struct kevent evt; - //TODO(zhujiashun): add EV_EOF - EV_SET(&evt, fd, EVFILT_WRITE, EV_ADD | EV_ENABLE | EV_CLEAR, - 0, 0, (void*)socket_id); - if (kevent(_epfd, &evt, 1, NULL, 0, NULL) < 0) { - return -1; - } - if (pollin) { - EV_SET(&evt, fd, EVFILT_READ, EV_ADD | EV_ENABLE | EV_CLEAR, - 0, 0, (void*)socket_id); - if (kevent(_epfd, &evt, 1, NULL, 0, NULL) < 0) { - return -1; - } - } -#endif - return 0; -} - -int EventDispatcher::RemoveEpollOut(SocketId socket_id, - int fd, bool pollin) { -#if defined(OS_LINUX) - if (pollin) { - epoll_event evt; - evt.data.u64 = socket_id; - evt.events = EPOLLIN | EPOLLET; -#ifdef BRPC_SOCKET_HAS_EOF - evt.events |= has_epollrdhup; -#endif - return epoll_ctl(_epfd, EPOLL_CTL_MOD, fd, &evt); - } else { - return epoll_ctl(_epfd, EPOLL_CTL_DEL, fd, NULL); - } -#elif defined(OS_MACOSX) - struct kevent evt; - EV_SET(&evt, fd, EVFILT_WRITE, EV_DELETE, 0, 0, NULL); - if (kevent(_epfd, &evt, 1, NULL, 0, NULL) < 0) { - return -1; - } - if (pollin) { - EV_SET(&evt, fd, EVFILT_READ, EV_ADD | EV_ENABLE | EV_CLEAR, - 0, 0, (void*)socket_id); - return kevent(_epfd, &evt, 1, NULL, 0, NULL); - } - return 0; -#endif - return -1; -} - -int EventDispatcher::AddConsumer(SocketId socket_id, int fd) { - if (_epfd < 0) { - errno = EINVAL; - return -1; - } -#if defined(OS_LINUX) - epoll_event evt; - evt.events = EPOLLIN | EPOLLET; - evt.data.u64 = socket_id; -#ifdef BRPC_SOCKET_HAS_EOF - evt.events |= has_epollrdhup; -#endif - return epoll_ctl(_epfd, EPOLL_CTL_ADD, fd, &evt); -#elif defined(OS_MACOSX) - struct kevent evt; - EV_SET(&evt, fd, EVFILT_READ, EV_ADD | EV_ENABLE | EV_CLEAR, - 0, 0, (void*)socket_id); - return kevent(_epfd, &evt, 1, NULL, 0, NULL); -#endif - return -1; -} - -int EventDispatcher::RemoveConsumer(int fd) { - if (fd < 0) { - return -1; - } - // Removing the consumer from dispatcher before closing the fd because - // if process was forked and the fd is not marked as close-on-exec, - // closing does not set reference count of the fd to 0, thus does not - // remove the fd from epoll. More badly, the fd will not be removable - // from epoll again! If the fd was level-triggered and there's data left, - // epoll_wait will keep returning events of the fd continuously, making - // program abnormal. -#if defined(OS_LINUX) - if (epoll_ctl(_epfd, EPOLL_CTL_DEL, fd, NULL) < 0) { - PLOG(WARNING) << "Fail to remove fd=" << fd << " from epfd=" << _epfd; - return -1; - } -#elif defined(OS_MACOSX) - struct kevent evt; - EV_SET(&evt, fd, EVFILT_READ, EV_DELETE, 0, 0, NULL); - kevent(_epfd, &evt, 1, NULL, 0, NULL); - EV_SET(&evt, fd, EVFILT_WRITE, EV_DELETE, 0, 0, NULL); - kevent(_epfd, &evt, 1, NULL, 0, NULL); -#endif - return 0; -} - -void* EventDispatcher::RunThis(void* arg) { - ((EventDispatcher*)arg)->Run(); - return NULL; -} - -void EventDispatcher::Run() { - while (!_stop) { -#if defined(OS_LINUX) - epoll_event e[32]; -#ifdef BRPC_ADDITIONAL_EPOLL - // Performance downgrades in examples. - int n = epoll_wait(_epfd, e, ARRAY_SIZE(e), 0); - if (n == 0) { - n = epoll_wait(_epfd, e, ARRAY_SIZE(e), -1); - } -#else - const int n = epoll_wait(_epfd, e, ARRAY_SIZE(e), -1); -#endif -#elif defined(OS_MACOSX) - struct kevent e[32]; - int n = kevent(_epfd, NULL, 0, e, ARRAY_SIZE(e), NULL); -#endif - if (_stop) { - // epoll_ctl/epoll_wait should have some sort of memory fencing - // guaranteeing that we(after epoll_wait) see _stop set before - // epoll_ctl. - break; - } - if (n < 0) { - if (EINTR == errno) { - // We've checked _stop, no wake-up will be missed. - continue; - } -#if defined(OS_LINUX) - PLOG(FATAL) << "Fail to epoll_wait epfd=" << _epfd; -#elif defined(OS_MACOSX) - PLOG(FATAL) << "Fail to kqueue epfd=" << _epfd; -#endif - break; - } - for (int i = 0; i < n; ++i) { -#if defined(OS_LINUX) - if (e[i].events & (EPOLLIN | EPOLLERR | EPOLLHUP) -#ifdef BRPC_SOCKET_HAS_EOF - || (e[i].events & has_epollrdhup) -#endif - ) { - // We don't care about the return value. - Socket::StartInputEvent(e[i].data.u64, e[i].events, - _consumer_thread_attr); - } -#elif defined(OS_MACOSX) - if ((e[i].flags & EV_ERROR) || e[i].filter == EVFILT_READ) { - // We don't care about the return value. - Socket::StartInputEvent((SocketId)e[i].udata, e[i].filter, - _consumer_thread_attr); - } -#endif - } - for (int i = 0; i < n; ++i) { -#if defined(OS_LINUX) - if (e[i].events & (EPOLLOUT | EPOLLERR | EPOLLHUP)) { - // We don't care about the return value. - Socket::HandleEpollOut(e[i].data.u64); - } -#elif defined(OS_MACOSX) - if ((e[i].flags & EV_ERROR) || e[i].filter == EVFILT_WRITE) { - // We don't care about the return value. - Socket::HandleEpollOut((SocketId)e[i].udata); - } -#endif - } - } -} - static EventDispatcher* g_edisp = NULL; static pthread_once_t g_edisp_once = PTHREAD_ONCE_INIT; @@ -378,3 +63,11 @@ EventDispatcher& GetGlobalEventDispatcher(int fd) { } } // namespace brpc + +#if defined(OS_LINUX) + #include "brpc/event_dispatcher_epoll.cpp" +#elif defined(OS_MACOSX) + #include "brpc/event_dispatcher_kqueue.cpp" +#else + #error Not implemented +#endif diff --git a/src/brpc/event_dispatcher.h b/src/brpc/event_dispatcher.h index eaa57e36..b6cae400 100644 --- a/src/brpc/event_dispatcher.h +++ b/src/brpc/event_dispatcher.h @@ -94,9 +94,6 @@ private: // The attribute of bthreads calling user callbacks. bthread_attr_t _consumer_thread_attr; - // The attribute of bthread epoll_wait. - bthread_attr_t _epoll_thread_attr; - // Pipe fds to wakeup EventDispatcher from `epoll_wait' in order to quit int _wakeup_fds[2]; }; diff --git a/src/brpc/event_dispatcher.cpp b/src/brpc/event_dispatcher_epoll.cpp similarity index 58% copy from src/brpc/event_dispatcher.cpp copy to src/brpc/event_dispatcher_epoll.cpp index 56b48a76..07d485e6 100644 --- a/src/brpc/event_dispatcher.cpp +++ b/src/brpc/event_dispatcher_epoll.cpp @@ -16,51 +16,23 @@ // under the License. -#include <gflags/gflags.h> // DEFINE_int32 -#include "butil/compat.h" -#include "butil/fd_utility.h" // make_close_on_exec -#include "butil/logging.h" // LOG -#include "butil/third_party/murmurhash3/murmurhash3.h"// fmix32 -#include "bthread/bthread.h" // bthread_start_background -#include "brpc/event_dispatcher.h" #ifdef BRPC_SOCKET_HAS_EOF #include "brpc/details/has_epollrdhup.h" #endif -#include "brpc/reloadable_flags.h" -#if defined(OS_MACOSX) -#include <sys/types.h> -#include <sys/event.h> -#include <sys/time.h> -#endif namespace brpc { -DEFINE_int32(event_dispatcher_num, 1, "Number of event dispatcher"); - -DEFINE_bool(usercode_in_pthread, false, - "Call user's callback in pthreads, use bthreads otherwise"); - EventDispatcher::EventDispatcher() : _epfd(-1) , _stop(false) , _tid(0) , _consumer_thread_attr(BTHREAD_ATTR_NORMAL) { -#if defined(OS_LINUX) _epfd = epoll_create(1024 * 1024); if (_epfd < 0) { PLOG(FATAL) << "Fail to create epoll"; return; } -#elif defined(OS_MACOSX) - _epfd = kqueue(); - if (_epfd < 0) { - PLOG(FATAL) << "Fail to create kqueue"; - return; - } -#else - #error Not implemented -#endif CHECK_EQ(0, butil::make_close_on_exec(_epfd)); _wakeup_fds[0] = -1; @@ -86,11 +58,7 @@ EventDispatcher::~EventDispatcher() { int EventDispatcher::Start(const bthread_attr_t* consumer_thread_attr) { if (_epfd < 0) { -#if defined(OS_LINUX) LOG(FATAL) << "epoll was not created"; -#elif defined(OS_MACOSX) - LOG(FATAL) << "kqueue was not created"; -#endif return -1; } @@ -100,14 +68,14 @@ int EventDispatcher::Start(const bthread_attr_t* consumer_thread_attr) { return -1; } - // Set _consumer_thread_attr before creating epoll/kqueue thread to make sure + // Set _consumer_thread_attr before creating epoll thread to make sure // everyting seems sane to the thread. _consumer_thread_attr = (consumer_thread_attr ? *consumer_thread_attr : BTHREAD_ATTR_NORMAL); //_consumer_thread_attr is used in StartInputEvent(), assign flag NEVER_QUIT to it will cause new bthread // that created by epoll_wait() never to quit. - _epoll_thread_attr = _consumer_thread_attr | BTHREAD_NEVER_QUIT; + bthread_attr_t epoll_thread_attr = _consumer_thread_attr | BTHREAD_NEVER_QUIT; // Polling thread uses the same attr for consumer threads (NORMAL right // now). Previously, we used small stack (32KB) which may be overflowed @@ -115,9 +83,9 @@ int EventDispatcher::Start(const bthread_attr_t* consumer_thread_attr) { // is also a potential issue for consumer threads, using the same attr // should be a reasonable solution. int rc = bthread_start_background( - &_tid, &_epoll_thread_attr, RunThis, this); + &_tid, &epoll_thread_attr, RunThis, this); if (rc) { - LOG(FATAL) << "Fail to create epoll/kqueue thread: " << berror(rc); + LOG(FATAL) << "Fail to create epoll thread: " << berror(rc); return -1; } return 0; @@ -131,15 +99,8 @@ void EventDispatcher::Stop() { _stop = true; if (_epfd >= 0) { -#if defined(OS_LINUX) epoll_event evt = { EPOLLOUT, { NULL } }; epoll_ctl(_epfd, EPOLL_CTL_ADD, _wakeup_fds[1], &evt); -#elif defined(OS_MACOSX) - struct kevent kqueue_event; - EV_SET(&kqueue_event, _wakeup_fds[1], EVFILT_WRITE, EV_ADD | EV_ENABLE, - 0, 0, NULL); - kevent(_epfd, &kqueue_event, 1, NULL, 0, NULL); -#endif } } @@ -156,7 +117,6 @@ int EventDispatcher::AddEpollOut(SocketId socket_id, int fd, bool pollin) { return -1; } -#if defined(OS_LINUX) epoll_event evt; evt.data.u64 = socket_id; evt.events = EPOLLOUT | EPOLLET; @@ -175,28 +135,11 @@ int EventDispatcher::AddEpollOut(SocketId socket_id, int fd, bool pollin) { return -1; } } -#elif defined(OS_MACOSX) - struct kevent evt; - //TODO(zhujiashun): add EV_EOF - EV_SET(&evt, fd, EVFILT_WRITE, EV_ADD | EV_ENABLE | EV_CLEAR, - 0, 0, (void*)socket_id); - if (kevent(_epfd, &evt, 1, NULL, 0, NULL) < 0) { - return -1; - } - if (pollin) { - EV_SET(&evt, fd, EVFILT_READ, EV_ADD | EV_ENABLE | EV_CLEAR, - 0, 0, (void*)socket_id); - if (kevent(_epfd, &evt, 1, NULL, 0, NULL) < 0) { - return -1; - } - } -#endif return 0; } int EventDispatcher::RemoveEpollOut(SocketId socket_id, int fd, bool pollin) { -#if defined(OS_LINUX) if (pollin) { epoll_event evt; evt.data.u64 = socket_id; @@ -208,19 +151,6 @@ int EventDispatcher::RemoveEpollOut(SocketId socket_id, } else { return epoll_ctl(_epfd, EPOLL_CTL_DEL, fd, NULL); } -#elif defined(OS_MACOSX) - struct kevent evt; - EV_SET(&evt, fd, EVFILT_WRITE, EV_DELETE, 0, 0, NULL); - if (kevent(_epfd, &evt, 1, NULL, 0, NULL) < 0) { - return -1; - } - if (pollin) { - EV_SET(&evt, fd, EVFILT_READ, EV_ADD | EV_ENABLE | EV_CLEAR, - 0, 0, (void*)socket_id); - return kevent(_epfd, &evt, 1, NULL, 0, NULL); - } - return 0; -#endif return -1; } @@ -229,7 +159,6 @@ int EventDispatcher::AddConsumer(SocketId socket_id, int fd) { errno = EINVAL; return -1; } -#if defined(OS_LINUX) epoll_event evt; evt.events = EPOLLIN | EPOLLET; evt.data.u64 = socket_id; @@ -237,12 +166,6 @@ int EventDispatcher::AddConsumer(SocketId socket_id, int fd) { evt.events |= has_epollrdhup; #endif return epoll_ctl(_epfd, EPOLL_CTL_ADD, fd, &evt); -#elif defined(OS_MACOSX) - struct kevent evt; - EV_SET(&evt, fd, EVFILT_READ, EV_ADD | EV_ENABLE | EV_CLEAR, - 0, 0, (void*)socket_id); - return kevent(_epfd, &evt, 1, NULL, 0, NULL); -#endif return -1; } @@ -257,18 +180,10 @@ int EventDispatcher::RemoveConsumer(int fd) { // from epoll again! If the fd was level-triggered and there's data left, // epoll_wait will keep returning events of the fd continuously, making // program abnormal. -#if defined(OS_LINUX) if (epoll_ctl(_epfd, EPOLL_CTL_DEL, fd, NULL) < 0) { PLOG(WARNING) << "Fail to remove fd=" << fd << " from epfd=" << _epfd; return -1; } -#elif defined(OS_MACOSX) - struct kevent evt; - EV_SET(&evt, fd, EVFILT_READ, EV_DELETE, 0, 0, NULL); - kevent(_epfd, &evt, 1, NULL, 0, NULL); - EV_SET(&evt, fd, EVFILT_WRITE, EV_DELETE, 0, 0, NULL); - kevent(_epfd, &evt, 1, NULL, 0, NULL); -#endif return 0; } @@ -279,7 +194,6 @@ void* EventDispatcher::RunThis(void* arg) { void EventDispatcher::Run() { while (!_stop) { -#if defined(OS_LINUX) epoll_event e[32]; #ifdef BRPC_ADDITIONAL_EPOLL // Performance downgrades in examples. @@ -289,10 +203,6 @@ void EventDispatcher::Run() { } #else const int n = epoll_wait(_epfd, e, ARRAY_SIZE(e), -1); -#endif -#elif defined(OS_MACOSX) - struct kevent e[32]; - int n = kevent(_epfd, NULL, 0, e, ARRAY_SIZE(e), NULL); #endif if (_stop) { // epoll_ctl/epoll_wait should have some sort of memory fencing @@ -305,15 +215,10 @@ void EventDispatcher::Run() { // We've checked _stop, no wake-up will be missed. continue; } -#if defined(OS_LINUX) PLOG(FATAL) << "Fail to epoll_wait epfd=" << _epfd; -#elif defined(OS_MACOSX) - PLOG(FATAL) << "Fail to kqueue epfd=" << _epfd; -#endif break; } for (int i = 0; i < n; ++i) { -#if defined(OS_LINUX) if (e[i].events & (EPOLLIN | EPOLLERR | EPOLLHUP) #ifdef BRPC_SOCKET_HAS_EOF || (e[i].events & has_epollrdhup) @@ -323,58 +228,14 @@ void EventDispatcher::Run() { Socket::StartInputEvent(e[i].data.u64, e[i].events, _consumer_thread_attr); } -#elif defined(OS_MACOSX) - if ((e[i].flags & EV_ERROR) || e[i].filter == EVFILT_READ) { - // We don't care about the return value. - Socket::StartInputEvent((SocketId)e[i].udata, e[i].filter, - _consumer_thread_attr); - } -#endif } for (int i = 0; i < n; ++i) { -#if defined(OS_LINUX) if (e[i].events & (EPOLLOUT | EPOLLERR | EPOLLHUP)) { // We don't care about the return value. Socket::HandleEpollOut(e[i].data.u64); } -#elif defined(OS_MACOSX) - if ((e[i].flags & EV_ERROR) || e[i].filter == EVFILT_WRITE) { - // We don't care about the return value. - Socket::HandleEpollOut((SocketId)e[i].udata); - } -#endif } } } -static EventDispatcher* g_edisp = NULL; -static pthread_once_t g_edisp_once = PTHREAD_ONCE_INIT; - -static void StopAndJoinGlobalDispatchers() { - for (int i = 0; i < FLAGS_event_dispatcher_num; ++i) { - g_edisp[i].Stop(); - g_edisp[i].Join(); - } -} -void InitializeGlobalDispatchers() { - g_edisp = new EventDispatcher[FLAGS_event_dispatcher_num]; - for (int i = 0; i < FLAGS_event_dispatcher_num; ++i) { - const bthread_attr_t attr = FLAGS_usercode_in_pthread ? - BTHREAD_ATTR_PTHREAD : BTHREAD_ATTR_NORMAL; - CHECK_EQ(0, g_edisp[i].Start(&attr)); - } - // This atexit is will be run before g_task_control.stop() because above - // Start() initializes g_task_control by creating bthread (to run epoll/kqueue). - CHECK_EQ(0, atexit(StopAndJoinGlobalDispatchers)); -} - -EventDispatcher& GetGlobalEventDispatcher(int fd) { - pthread_once(&g_edisp_once, InitializeGlobalDispatchers); - if (FLAGS_event_dispatcher_num == 1) { - return g_edisp[0]; - } - int index = butil::fmix32(fd) % FLAGS_event_dispatcher_num; - return g_edisp[index]; -} - } // namespace brpc diff --git a/src/brpc/event_dispatcher.cpp b/src/brpc/event_dispatcher_kqueue.cpp similarity index 52% copy from src/brpc/event_dispatcher.cpp copy to src/brpc/event_dispatcher_kqueue.cpp index 56b48a76..614cd3bc 100644 --- a/src/brpc/event_dispatcher.cpp +++ b/src/brpc/event_dispatcher_kqueue.cpp @@ -16,51 +16,23 @@ // under the License. -#include <gflags/gflags.h> // DEFINE_int32 -#include "butil/compat.h" -#include "butil/fd_utility.h" // make_close_on_exec -#include "butil/logging.h" // LOG -#include "butil/third_party/murmurhash3/murmurhash3.h"// fmix32 -#include "bthread/bthread.h" // bthread_start_background -#include "brpc/event_dispatcher.h" -#ifdef BRPC_SOCKET_HAS_EOF -#include "brpc/details/has_epollrdhup.h" -#endif -#include "brpc/reloadable_flags.h" -#if defined(OS_MACOSX) #include <sys/types.h> #include <sys/event.h> #include <sys/time.h> -#endif namespace brpc { -DEFINE_int32(event_dispatcher_num, 1, "Number of event dispatcher"); - -DEFINE_bool(usercode_in_pthread, false, - "Call user's callback in pthreads, use bthreads otherwise"); - EventDispatcher::EventDispatcher() : _epfd(-1) , _stop(false) , _tid(0) , _consumer_thread_attr(BTHREAD_ATTR_NORMAL) { -#if defined(OS_LINUX) - _epfd = epoll_create(1024 * 1024); - if (_epfd < 0) { - PLOG(FATAL) << "Fail to create epoll"; - return; - } -#elif defined(OS_MACOSX) _epfd = kqueue(); if (_epfd < 0) { PLOG(FATAL) << "Fail to create kqueue"; return; } -#else - #error Not implemented -#endif CHECK_EQ(0, butil::make_close_on_exec(_epfd)); _wakeup_fds[0] = -1; @@ -86,11 +58,7 @@ EventDispatcher::~EventDispatcher() { int EventDispatcher::Start(const bthread_attr_t* consumer_thread_attr) { if (_epfd < 0) { -#if defined(OS_LINUX) - LOG(FATAL) << "epoll was not created"; -#elif defined(OS_MACOSX) LOG(FATAL) << "kqueue was not created"; -#endif return -1; } @@ -100,14 +68,14 @@ int EventDispatcher::Start(const bthread_attr_t* consumer_thread_attr) { return -1; } - // Set _consumer_thread_attr before creating epoll/kqueue thread to make sure + // Set _consumer_thread_attr before creating kqueue thread to make sure // everyting seems sane to the thread. _consumer_thread_attr = (consumer_thread_attr ? *consumer_thread_attr : BTHREAD_ATTR_NORMAL); //_consumer_thread_attr is used in StartInputEvent(), assign flag NEVER_QUIT to it will cause new bthread - // that created by epoll_wait() never to quit. - _epoll_thread_attr = _consumer_thread_attr | BTHREAD_NEVER_QUIT; + // that created by kevent() never to quit. + bthread_attr_t kqueue_thread_attr = _consumer_thread_attr | BTHREAD_NEVER_QUIT; // Polling thread uses the same attr for consumer threads (NORMAL right // now). Previously, we used small stack (32KB) which may be overflowed @@ -115,9 +83,9 @@ int EventDispatcher::Start(const bthread_attr_t* consumer_thread_attr) { // is also a potential issue for consumer threads, using the same attr // should be a reasonable solution. int rc = bthread_start_background( - &_tid, &_epoll_thread_attr, RunThis, this); + &_tid, &kqueue_thread_attr, RunThis, this); if (rc) { - LOG(FATAL) << "Fail to create epoll/kqueue thread: " << berror(rc); + LOG(FATAL) << "Fail to create kqueue thread: " << berror(rc); return -1; } return 0; @@ -131,15 +99,10 @@ void EventDispatcher::Stop() { _stop = true; if (_epfd >= 0) { -#if defined(OS_LINUX) - epoll_event evt = { EPOLLOUT, { NULL } }; - epoll_ctl(_epfd, EPOLL_CTL_ADD, _wakeup_fds[1], &evt); -#elif defined(OS_MACOSX) struct kevent kqueue_event; EV_SET(&kqueue_event, _wakeup_fds[1], EVFILT_WRITE, EV_ADD | EV_ENABLE, 0, 0, NULL); kevent(_epfd, &kqueue_event, 1, NULL, 0, NULL); -#endif } } @@ -156,26 +119,6 @@ int EventDispatcher::AddEpollOut(SocketId socket_id, int fd, bool pollin) { return -1; } -#if defined(OS_LINUX) - epoll_event evt; - evt.data.u64 = socket_id; - evt.events = EPOLLOUT | EPOLLET; -#ifdef BRPC_SOCKET_HAS_EOF - evt.events |= has_epollrdhup; -#endif - if (pollin) { - evt.events |= EPOLLIN; - if (epoll_ctl(_epfd, EPOLL_CTL_MOD, fd, &evt) < 0) { - // This fd has been removed from epoll via `RemoveConsumer', - // in which case errno will be ENOENT - return -1; - } - } else { - if (epoll_ctl(_epfd, EPOLL_CTL_ADD, fd, &evt) < 0) { - return -1; - } - } -#elif defined(OS_MACOSX) struct kevent evt; //TODO(zhujiashun): add EV_EOF EV_SET(&evt, fd, EVFILT_WRITE, EV_ADD | EV_ENABLE | EV_CLEAR, @@ -190,25 +133,11 @@ int EventDispatcher::AddEpollOut(SocketId socket_id, int fd, bool pollin) { return -1; } } -#endif return 0; } int EventDispatcher::RemoveEpollOut(SocketId socket_id, int fd, bool pollin) { -#if defined(OS_LINUX) - if (pollin) { - epoll_event evt; - evt.data.u64 = socket_id; - evt.events = EPOLLIN | EPOLLET; -#ifdef BRPC_SOCKET_HAS_EOF - evt.events |= has_epollrdhup; -#endif - return epoll_ctl(_epfd, EPOLL_CTL_MOD, fd, &evt); - } else { - return epoll_ctl(_epfd, EPOLL_CTL_DEL, fd, NULL); - } -#elif defined(OS_MACOSX) struct kevent evt; EV_SET(&evt, fd, EVFILT_WRITE, EV_DELETE, 0, 0, NULL); if (kevent(_epfd, &evt, 1, NULL, 0, NULL) < 0) { @@ -220,8 +149,6 @@ int EventDispatcher::RemoveEpollOut(SocketId socket_id, return kevent(_epfd, &evt, 1, NULL, 0, NULL); } return 0; -#endif - return -1; } int EventDispatcher::AddConsumer(SocketId socket_id, int fd) { @@ -229,21 +156,10 @@ int EventDispatcher::AddConsumer(SocketId socket_id, int fd) { errno = EINVAL; return -1; } -#if defined(OS_LINUX) - epoll_event evt; - evt.events = EPOLLIN | EPOLLET; - evt.data.u64 = socket_id; -#ifdef BRPC_SOCKET_HAS_EOF - evt.events |= has_epollrdhup; -#endif - return epoll_ctl(_epfd, EPOLL_CTL_ADD, fd, &evt); -#elif defined(OS_MACOSX) struct kevent evt; EV_SET(&evt, fd, EVFILT_READ, EV_ADD | EV_ENABLE | EV_CLEAR, 0, 0, (void*)socket_id); return kevent(_epfd, &evt, 1, NULL, 0, NULL); -#endif - return -1; } int EventDispatcher::RemoveConsumer(int fd) { @@ -253,22 +169,15 @@ int EventDispatcher::RemoveConsumer(int fd) { // Removing the consumer from dispatcher before closing the fd because // if process was forked and the fd is not marked as close-on-exec, // closing does not set reference count of the fd to 0, thus does not - // remove the fd from epoll. More badly, the fd will not be removable - // from epoll again! If the fd was level-triggered and there's data left, - // epoll_wait will keep returning events of the fd continuously, making + // remove the fd from kqueue More badly, the fd will not be removable + // from kqueue again! If the fd was level-triggered and there's data left, + // kevent will keep returning events of the fd continuously, making // program abnormal. -#if defined(OS_LINUX) - if (epoll_ctl(_epfd, EPOLL_CTL_DEL, fd, NULL) < 0) { - PLOG(WARNING) << "Fail to remove fd=" << fd << " from epfd=" << _epfd; - return -1; - } -#elif defined(OS_MACOSX) struct kevent evt; EV_SET(&evt, fd, EVFILT_READ, EV_DELETE, 0, 0, NULL); kevent(_epfd, &evt, 1, NULL, 0, NULL); EV_SET(&evt, fd, EVFILT_WRITE, EV_DELETE, 0, 0, NULL); kevent(_epfd, &evt, 1, NULL, 0, NULL); -#endif return 0; } @@ -279,25 +188,12 @@ void* EventDispatcher::RunThis(void* arg) { void EventDispatcher::Run() { while (!_stop) { -#if defined(OS_LINUX) - epoll_event e[32]; -#ifdef BRPC_ADDITIONAL_EPOLL - // Performance downgrades in examples. - int n = epoll_wait(_epfd, e, ARRAY_SIZE(e), 0); - if (n == 0) { - n = epoll_wait(_epfd, e, ARRAY_SIZE(e), -1); - } -#else - const int n = epoll_wait(_epfd, e, ARRAY_SIZE(e), -1); -#endif -#elif defined(OS_MACOSX) struct kevent e[32]; int n = kevent(_epfd, NULL, 0, e, ARRAY_SIZE(e), NULL); -#endif if (_stop) { - // epoll_ctl/epoll_wait should have some sort of memory fencing - // guaranteeing that we(after epoll_wait) see _stop set before - // epoll_ctl. + // EV_SET/kevent should have some sort of memory fencing + // guaranteeing that we(after kevent) see _stop set before + // EV_SET break; } if (n < 0) { @@ -305,76 +201,23 @@ void EventDispatcher::Run() { // We've checked _stop, no wake-up will be missed. continue; } -#if defined(OS_LINUX) - PLOG(FATAL) << "Fail to epoll_wait epfd=" << _epfd; -#elif defined(OS_MACOSX) PLOG(FATAL) << "Fail to kqueue epfd=" << _epfd; -#endif break; } for (int i = 0; i < n; ++i) { -#if defined(OS_LINUX) - if (e[i].events & (EPOLLIN | EPOLLERR | EPOLLHUP) -#ifdef BRPC_SOCKET_HAS_EOF - || (e[i].events & has_epollrdhup) -#endif - ) { - // We don't care about the return value. - Socket::StartInputEvent(e[i].data.u64, e[i].events, - _consumer_thread_attr); - } -#elif defined(OS_MACOSX) if ((e[i].flags & EV_ERROR) || e[i].filter == EVFILT_READ) { // We don't care about the return value. Socket::StartInputEvent((SocketId)e[i].udata, e[i].filter, _consumer_thread_attr); } -#endif } for (int i = 0; i < n; ++i) { -#if defined(OS_LINUX) - if (e[i].events & (EPOLLOUT | EPOLLERR | EPOLLHUP)) { - // We don't care about the return value. - Socket::HandleEpollOut(e[i].data.u64); - } -#elif defined(OS_MACOSX) if ((e[i].flags & EV_ERROR) || e[i].filter == EVFILT_WRITE) { // We don't care about the return value. Socket::HandleEpollOut((SocketId)e[i].udata); } -#endif } } } -static EventDispatcher* g_edisp = NULL; -static pthread_once_t g_edisp_once = PTHREAD_ONCE_INIT; - -static void StopAndJoinGlobalDispatchers() { - for (int i = 0; i < FLAGS_event_dispatcher_num; ++i) { - g_edisp[i].Stop(); - g_edisp[i].Join(); - } -} -void InitializeGlobalDispatchers() { - g_edisp = new EventDispatcher[FLAGS_event_dispatcher_num]; - for (int i = 0; i < FLAGS_event_dispatcher_num; ++i) { - const bthread_attr_t attr = FLAGS_usercode_in_pthread ? - BTHREAD_ATTR_PTHREAD : BTHREAD_ATTR_NORMAL; - CHECK_EQ(0, g_edisp[i].Start(&attr)); - } - // This atexit is will be run before g_task_control.stop() because above - // Start() initializes g_task_control by creating bthread (to run epoll/kqueue). - CHECK_EQ(0, atexit(StopAndJoinGlobalDispatchers)); -} - -EventDispatcher& GetGlobalEventDispatcher(int fd) { - pthread_once(&g_edisp_once, InitializeGlobalDispatchers); - if (FLAGS_event_dispatcher_num == 1) { - return g_edisp[0]; - } - int index = butil::fmix32(fd) % FLAGS_event_dispatcher_num; - return g_edisp[index]; -} - } // namespace brpc --------------------------------------------------------------------- To unsubscribe, e-mail: dev-unsubscr...@brpc.apache.org For additional commands, e-mail: dev-h...@brpc.apache.org