event_loop.cpp 8.1 KB


  1. #include "event_loop.h"
  2. #include "network.h"
  3. #include "thread_extra.h"
  4. #include <library/cpp/deprecated/atomic/atomic.h>
  5. #include <util/generic/hash.h>
  6. #include <util/network/pair.h>
  7. #include <util/network/poller.h>
  8. #include <util/system/event.h>
  9. #include <util/system/mutex.h>
  10. #include <util/system/thread.h>
  11. #include <util/system/yassert.h>
  12. #include <util/thread/lfqueue.h>
  13. #include <errno.h>
  14. using namespace NEventLoop;
  15. namespace {
  16. enum ERunningState {
  17. EVENT_LOOP_CREATED,
  18. EVENT_LOOP_RUNNING,
  19. EVENT_LOOP_STOPPED,
  20. };
  21. enum EOperation {
  22. OP_READ = 1,
  23. OP_WRITE = 2,
  24. OP_READ_WRITE = OP_READ | OP_WRITE,
  25. };
  26. }
  27. class TChannel::TImpl {
  28. public:
  29. TImpl(TEventLoop::TImpl* eventLoop, TSocket socket, TEventHandlerPtr, void* cookie);
  30. ~TImpl();
  31. void EnableRead();
  32. void DisableRead();
  33. void EnableWrite();
  34. void DisableWrite();
  35. void Unregister();
  36. SOCKET GetSocket() const;
  37. TSocket GetSocketPtr() const;
  38. void Update(int pollerFlags, bool enable);
  39. void CallHandler();
  40. TEventLoop::TImpl* EventLoop;
  41. TSocket Socket;
  42. TEventHandlerPtr EventHandler;
  43. void* Cookie;
  44. TMutex Mutex;
  45. int CurrentFlags;
  46. bool Close;
  47. };
  48. class TEventLoop::TImpl {
  49. public:
  50. TImpl(const char* name);
  51. void Run();
  52. void Wakeup();
  53. void Stop();
  54. TChannelPtr Register(TSocket socket, TEventHandlerPtr eventHandler, void* cookie);
  55. void Unregister(SOCKET socket);
  56. typedef THashMap<SOCKET, TChannelPtr> TData;
  57. void AddToPoller(SOCKET socket, void* cookie, int flags);
  58. TMutex Mutex;
  59. const char* Name;
  60. TAtomic RunningState;
  61. TAtomic StopSignal;
  62. TSystemEvent StoppedEvent;
  63. TData Data;
  64. TLockFreeQueue<SOCKET> SocketsToRemove;
  65. TSocketPoller Poller;
  66. TSocketHolder WakeupReadSocket;
  67. TSocketHolder WakeupWriteSocket;
  68. };
  69. TChannel::~TChannel() {
  70. }
  71. void TChannel::EnableRead() {
  72. Impl->EnableRead();
  73. }
  74. void TChannel::DisableRead() {
  75. Impl->DisableRead();
  76. }
  77. void TChannel::EnableWrite() {
  78. Impl->EnableWrite();
  79. }
  80. void TChannel::DisableWrite() {
  81. Impl->DisableWrite();
  82. }
  83. void TChannel::Unregister() {
  84. Impl->Unregister();
  85. }
  86. SOCKET TChannel::GetSocket() const {
  87. return Impl->GetSocket();
  88. }
  89. TSocket TChannel::GetSocketPtr() const {
  90. return Impl->GetSocketPtr();
  91. }
  92. TChannel::TChannel(TImpl* impl)
  93. : Impl(impl)
  94. {
  95. }
  96. TEventLoop::TEventLoop(const char* name)
  97. : Impl(new TImpl(name))
  98. {
  99. }
  100. TEventLoop::~TEventLoop() {
  101. }
  102. void TEventLoop::Run() {
  103. Impl->Run();
  104. }
  105. void TEventLoop::Stop() {
  106. Impl->Stop();
  107. }
  108. bool TEventLoop::IsRunning() {
  109. return AtomicGet(Impl->RunningState) == EVENT_LOOP_RUNNING;
  110. }
  111. TChannelPtr TEventLoop::Register(TSocket socket, TEventHandlerPtr eventHandler, void* cookie) {
  112. return Impl->Register(socket, eventHandler, cookie);
  113. }
  114. TChannel::TImpl::TImpl(TEventLoop::TImpl* eventLoop, TSocket socket, TEventHandlerPtr eventHandler, void* cookie)
  115. : EventLoop(eventLoop)
  116. , Socket(socket)
  117. , EventHandler(eventHandler)
  118. , Cookie(cookie)
  119. , CurrentFlags(0)
  120. , Close(false)
  121. {
  122. }
  123. TChannel::TImpl::~TImpl() {
  124. Y_ASSERT(Close);
  125. }
  126. void TChannel::TImpl::EnableRead() {
  127. Update(OP_READ, true);
  128. }
  129. void TChannel::TImpl::DisableRead() {
  130. Update(OP_READ, false);
  131. }
  132. void TChannel::TImpl::EnableWrite() {
  133. Update(OP_WRITE, true);
  134. }
  135. void TChannel::TImpl::DisableWrite() {
  136. Update(OP_WRITE, false);
  137. }
  138. void TChannel::TImpl::Unregister() {
  139. TGuard<TMutex> guard(Mutex);
  140. if (Close) {
  141. return;
  142. }
  143. Close = true;
  144. if (CurrentFlags != 0) {
  145. EventLoop->Poller.Unwait(Socket);
  146. CurrentFlags = 0;
  147. }
  148. EventHandler.Drop();
  149. EventLoop->SocketsToRemove.Enqueue(Socket);
  150. EventLoop->Wakeup();
  151. }
  152. void TChannel::TImpl::Update(int flags, bool enable) {
  153. TGuard<TMutex> guard(Mutex);
  154. if (Close) {
  155. return;
  156. }
  157. int newFlags = enable ? (CurrentFlags | flags) : (CurrentFlags & ~flags);
  158. if (CurrentFlags == newFlags) {
  159. return;
  160. }
  161. if (!newFlags) {
  162. EventLoop->Poller.Unwait(Socket);
  163. } else {
  164. void* cookie = reinterpret_cast<void*>(this);
  165. EventLoop->AddToPoller(Socket, cookie, newFlags);
  166. }
  167. CurrentFlags = newFlags;
  168. }
  169. SOCKET TChannel::TImpl::GetSocket() const {
  170. return Socket;
  171. }
  172. TSocket TChannel::TImpl::GetSocketPtr() const {
  173. return Socket;
  174. }
  175. void TChannel::TImpl::CallHandler() {
  176. TEventHandlerPtr handler;
  177. {
  178. TGuard<TMutex> guard(Mutex);
  179. // other thread may have re-added socket to epoll
  180. // so even if CurrentFlags is 0, epoll may fire again
  181. // so please use non-blocking operations
  182. CurrentFlags = 0;
  183. if (Close) {
  184. return;
  185. }
  186. handler = EventHandler;
  187. }
  188. if (!!handler) {
  189. handler->HandleEvent(Socket, Cookie);
  190. }
  191. }
  192. TEventLoop::TImpl::TImpl(const char* name)
  193. : Name(name)
  194. , RunningState(EVENT_LOOP_CREATED)
  195. , StopSignal(0)
  196. {
  197. SOCKET wakeupSockets[2];
  198. if (SocketPair(wakeupSockets) < 0) {
  199. Y_ABORT("failed to create socket pair for wakeup sockets: %s", LastSystemErrorText());
  200. }
  201. TSocketHolder wakeupReadSocket(wakeupSockets[0]);
  202. TSocketHolder wakeupWriteSocket(wakeupSockets[1]);
  203. WakeupReadSocket.Swap(wakeupReadSocket);
  204. WakeupWriteSocket.Swap(wakeupWriteSocket);
  205. SetNonBlock(WakeupWriteSocket, true);
  206. SetNonBlock(WakeupReadSocket, true);
  207. Poller.WaitRead(WakeupReadSocket,
  208. reinterpret_cast<void*>(this));
  209. }
  210. void TEventLoop::TImpl::Run() {
  211. bool res = AtomicCas(&RunningState, EVENT_LOOP_RUNNING, EVENT_LOOP_CREATED);
  212. Y_ABORT_UNLESS(res, "Invalid mbus event loop state");
  213. if (!!Name) {
  214. SetCurrentThreadName(Name);
  215. }
  216. while (AtomicGet(StopSignal) == 0) {
  217. void* cookies[1024];
  218. const size_t count = Poller.WaitI(cookies, Y_ARRAY_SIZE(cookies));
  219. void** end = cookies + count;
  220. for (void** c = cookies; c != end; ++c) {
  221. TChannel::TImpl* s = reinterpret_cast<TChannel::TImpl*>(*c);
  222. if (*c == this) {
  223. char buf[0x1000];
  224. if (NBus::NPrivate::SocketRecv(WakeupReadSocket, buf) < 0) {
  225. Y_ABORT("failed to recv from wakeup socket: %s", LastSystemErrorText());
  226. }
  227. continue;
  228. }
  229. s->CallHandler();
  230. }
  231. SOCKET socket = -1;
  232. while (SocketsToRemove.Dequeue(&socket)) {
  233. TGuard<TMutex> guard(Mutex);
  234. Y_ABORT_UNLESS(Data.erase(socket) == 1, "must be removed once");
  235. }
  236. }
  237. {
  238. TGuard<TMutex> guard(Mutex);
  239. for (auto& it : Data) {
  240. it.second->Unregister();
  241. }
  242. // release file descriptors
  243. Data.clear();
  244. }
  245. res = AtomicCas(&RunningState, EVENT_LOOP_STOPPED, EVENT_LOOP_RUNNING);
  246. Y_ABORT_UNLESS(res);
  247. StoppedEvent.Signal();
  248. }
  249. void TEventLoop::TImpl::Stop() {
  250. AtomicSet(StopSignal, 1);
  251. if (AtomicGet(RunningState) == EVENT_LOOP_RUNNING) {
  252. Wakeup();
  253. StoppedEvent.WaitI();
  254. }
  255. }
  256. TChannelPtr TEventLoop::TImpl::Register(TSocket socket, TEventHandlerPtr eventHandler, void* cookie) {
  257. Y_ABORT_UNLESS(socket != INVALID_SOCKET, "must be a valid socket");
  258. TChannelPtr channel = new TChannel(new TChannel::TImpl(this, socket, eventHandler, cookie));
  259. TGuard<TMutex> guard(Mutex);
  260. Y_ABORT_UNLESS(Data.insert(std::make_pair(socket, channel)).second, "must not be already inserted");
  261. return channel;
  262. }
  263. void TEventLoop::TImpl::Wakeup() {
  264. if (NBus::NPrivate::SocketSend(WakeupWriteSocket, TArrayRef<const char>("", 1)) < 0) {
  265. if (LastSystemError() != EAGAIN) {
  266. Y_ABORT("failed to send to wakeup socket: %s", LastSystemErrorText());
  267. }
  268. }
  269. }
  270. void TEventLoop::TImpl::AddToPoller(SOCKET socket, void* cookie, int flags) {
  271. if (flags == OP_READ) {
  272. Poller.WaitReadOneShot(socket, cookie);
  273. } else if (flags == OP_WRITE) {
  274. Poller.WaitWriteOneShot(socket, cookie);
  275. } else if (flags == OP_READ_WRITE) {
  276. Poller.WaitReadWriteOneShot(socket, cookie);
  277. } else {
  278. Y_ABORT("Wrong flags: %d", int(flags));
  279. }
  280. }