poller_actor.cpp 9.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294
  1. #include "poller_actor.h"
  2. #include "interconnect_common.h"
  3. #include <library/cpp/actors/core/actor_bootstrapped.h>
  4. #include <library/cpp/actors/core/actorsystem.h>
  5. #include <library/cpp/actors/core/hfunc.h>
  6. #include <library/cpp/actors/core/log.h>
  7. #include <library/cpp/actors/core/probes.h>
  8. #include <library/cpp/actors/protos/services_common.pb.h>
  9. #include <library/cpp/actors/util/funnel_queue.h>
  10. #include <util/generic/intrlist.h>
  11. #include <util/system/thread.h>
  12. #include <util/system/event.h>
  13. #include <util/system/pipe.h>
  14. #include <variant>
  15. namespace NActors {
  16. LWTRACE_USING(ACTORLIB_PROVIDER);
  17. namespace {
  18. int LastSocketError() {
  19. #if defined(_win_)
  20. return WSAGetLastError();
  21. #else
  22. return errno;
  23. #endif
  24. }
  25. }
  26. struct TSocketRecord : TThrRefBase {
  27. const TIntrusivePtr<TSharedDescriptor> Socket;
  28. const TActorId ReadActorId;
  29. const TActorId WriteActorId;
  30. std::atomic_uint32_t Flags = 0;
  31. TSocketRecord(TEvPollerRegister& ev)
  32. : Socket(std::move(ev.Socket))
  33. , ReadActorId(ev.ReadActorId)
  34. , WriteActorId(ev.WriteActorId)
  35. {}
  36. };
  37. template<typename TDerived>
  38. class TPollerThreadBase : public ISimpleThread {
  39. protected:
  40. struct TPollerExitThread {}; // issued then we need to terminate the poller thread
  41. struct TPollerWakeup {};
  42. struct TPollerUnregisterSocket {
  43. TIntrusivePtr<TSharedDescriptor> Socket;
  44. TPollerUnregisterSocket(TIntrusivePtr<TSharedDescriptor> socket)
  45. : Socket(std::move(socket))
  46. {}
  47. };
  48. using TPollerSyncOperation = std::variant<TPollerExitThread, TPollerWakeup, TPollerUnregisterSocket>;
  49. struct TPollerSyncOperationWrapper {
  50. TPollerSyncOperation Operation;
  51. TManualEvent Event;
  52. TPollerSyncOperationWrapper(TPollerSyncOperation&& operation)
  53. : Operation(std::move(operation))
  54. {}
  55. void Wait() {
  56. Event.WaitI();
  57. }
  58. void SignalDone() {
  59. Event.Signal();
  60. }
  61. };
  62. TActorSystem *ActorSystem;
  63. TPipeHandle ReadEnd, WriteEnd; // pipe for sync event processor
  64. TFunnelQueue<TPollerSyncOperationWrapper*> SyncOperationsQ; // operation queue
  65. public:
  66. TPollerThreadBase(TActorSystem *actorSystem)
  67. : ActorSystem(actorSystem)
  68. {
  69. // create a pipe for notifications
  70. try {
  71. TPipeHandle::Pipe(ReadEnd, WriteEnd, CloseOnExec);
  72. } catch (const TFileError& err) {
  73. Y_FAIL("failed to create pipe");
  74. }
  75. // switch the read/write ends to nonblocking mode
  76. SetNonBlock(ReadEnd);
  77. SetNonBlock(WriteEnd);
  78. }
  79. void UnregisterSocket(const TIntrusivePtr<TSocketRecord>& record) {
  80. ExecuteSyncOperation(TPollerUnregisterSocket(record->Socket));
  81. }
  82. protected:
  83. void Notify(TSocketRecord *record, bool read, bool write) {
  84. auto issue = [&](const TActorId& recipient) {
  85. ActorSystem->Send(new IEventHandle(recipient, {}, new TEvPollerReady(record->Socket, read, write)));
  86. };
  87. if (read && record->ReadActorId) {
  88. issue(record->ReadActorId);
  89. if (write && record->WriteActorId && record->WriteActorId != record->ReadActorId) {
  90. issue(record->WriteActorId);
  91. }
  92. } else if (write && record->WriteActorId) {
  93. issue(record->WriteActorId);
  94. }
  95. }
  96. void Stop() {
  97. // signal poller thread to stop and wait for the thread
  98. ExecuteSyncOperation(TPollerExitThread());
  99. ISimpleThread::Join();
  100. }
  101. void ExecuteSyncOperation(TPollerSyncOperation&& op) {
  102. TPollerSyncOperationWrapper wrapper(std::move(op));
  103. if (SyncOperationsQ.Push(&wrapper)) {
  104. // this was the first entry, so we push notification through the pipe
  105. for (;;) {
  106. char buffer = '\x00';
  107. ssize_t nwritten = WriteEnd.Write(&buffer, sizeof(buffer));
  108. if (nwritten < 0) {
  109. const int err = LastSocketError();
  110. if (err == EINTR) {
  111. continue;
  112. } else {
  113. Y_FAIL("WriteEnd.Write() failed with %s", strerror(err));
  114. }
  115. } else {
  116. Y_VERIFY(nwritten);
  117. break;
  118. }
  119. }
  120. }
  121. // wait for operation to complete
  122. wrapper.Wait();
  123. }
  124. bool DrainReadEnd() {
  125. size_t totalRead = 0;
  126. char buffer[4096];
  127. for (;;) {
  128. ssize_t n = ReadEnd.Read(buffer, sizeof(buffer));
  129. if (n < 0) {
  130. const int error = LastSocketError();
  131. if (error == EINTR) {
  132. continue;
  133. } else if (error == EAGAIN || error == EWOULDBLOCK) {
  134. break;
  135. } else {
  136. Y_FAIL("read() failed with %s", strerror(errno));
  137. }
  138. } else {
  139. Y_VERIFY(n);
  140. totalRead += n;
  141. }
  142. }
  143. return totalRead;
  144. }
  145. bool ProcessSyncOpQueue() {
  146. if (DrainReadEnd()) {
  147. Y_VERIFY(!SyncOperationsQ.IsEmpty());
  148. do {
  149. TPollerSyncOperationWrapper *op = SyncOperationsQ.Top();
  150. if (auto *unregister = std::get_if<TPollerUnregisterSocket>(&op->Operation)) {
  151. static_cast<TDerived&>(*this).UnregisterSocketInLoop(unregister->Socket);
  152. op->SignalDone();
  153. } else if (std::get_if<TPollerExitThread>(&op->Operation)) {
  154. op->SignalDone();
  155. return false; // terminate the thread
  156. } else if (std::get_if<TPollerWakeup>(&op->Operation)) {
  157. op->SignalDone();
  158. } else {
  159. Y_FAIL();
  160. }
  161. } while (SyncOperationsQ.Pop());
  162. }
  163. return true;
  164. }
  165. void *ThreadProc() override {
  166. SetCurrentThreadName("network poller");
  167. while (ProcessSyncOpQueue()) {
  168. static_cast<TDerived&>(*this).ProcessEventsInLoop();
  169. }
  170. return nullptr;
  171. }
  172. };
  173. } // namespace NActors
  174. #if defined(_linux_)
  175. # include "poller_actor_linux.h"
  176. #elif defined(_darwin_)
  177. # include "poller_actor_darwin.h"
  178. #elif defined(_win_)
  179. # include "poller_actor_win.h"
  180. #else
  181. # error "Unsupported platform"
  182. #endif
  183. namespace NActors {
  184. class TPollerToken::TImpl {
  185. std::weak_ptr<TPollerThread> Thread;
  186. TIntrusivePtr<TSocketRecord> Record; // valid only when Thread is held locked
  187. public:
  188. TImpl(std::shared_ptr<TPollerThread> thread, TIntrusivePtr<TSocketRecord> record)
  189. : Thread(thread)
  190. , Record(std::move(record))
  191. {
  192. thread->RegisterSocket(Record);
  193. }
  194. ~TImpl() {
  195. if (auto thread = Thread.lock()) {
  196. thread->UnregisterSocket(Record);
  197. }
  198. }
  199. void Request(bool read, bool write) {
  200. if (auto thread = Thread.lock()) {
  201. thread->Request(Record, read, write);
  202. }
  203. }
  204. const TIntrusivePtr<TSharedDescriptor>& Socket() const {
  205. return Record->Socket;
  206. }
  207. };
  208. class TPollerActor: public TActorBootstrapped<TPollerActor> {
  209. // poller thread
  210. std::shared_ptr<TPollerThread> PollerThread;
  211. public:
  212. static constexpr IActor::EActivityType ActorActivityType() {
  213. return IActor::INTERCONNECT_POLLER;
  214. }
  215. void Bootstrap() {
  216. PollerThread = std::make_shared<TPollerThread>(TlsActivationContext->ExecutorThread.ActorSystem);
  217. Become(&TPollerActor::StateFunc);
  218. }
  219. STRICT_STFUNC(StateFunc,
  220. hFunc(TEvPollerRegister, Handle);
  221. cFunc(TEvents::TSystem::Poison, PassAway);
  222. )
  223. void Handle(TEvPollerRegister::TPtr& ev) {
  224. auto *msg = ev->Get();
  225. auto impl = std::make_unique<TPollerToken::TImpl>(PollerThread, MakeIntrusive<TSocketRecord>(*msg));
  226. auto socket = impl->Socket();
  227. TPollerToken::TPtr token(new TPollerToken(std::move(impl)));
  228. if (msg->ReadActorId && msg->WriteActorId && msg->WriteActorId != msg->ReadActorId) {
  229. Send(msg->ReadActorId, new TEvPollerRegisterResult(socket, token));
  230. Send(msg->WriteActorId, new TEvPollerRegisterResult(socket, std::move(token)));
  231. } else if (msg->ReadActorId) {
  232. Send(msg->ReadActorId, new TEvPollerRegisterResult(socket, std::move(token)));
  233. } else if (msg->WriteActorId) {
  234. Send(msg->WriteActorId, new TEvPollerRegisterResult(socket, std::move(token)));
  235. }
  236. }
  237. };
  238. TPollerToken::TPollerToken(std::unique_ptr<TImpl> impl)
  239. : Impl(std::move(impl))
  240. {}
  241. TPollerToken::~TPollerToken()
  242. {}
  243. void TPollerToken::Request(bool read, bool write) {
  244. Impl->Request(read, write);
  245. }
  246. IActor* CreatePollerActor() {
  247. return new TPollerActor;
  248. }
  249. }