events_local.h 14 KB


  1. #pragma once
  2. #include <library/cpp/actors/core/events.h>
  3. #include <library/cpp/actors/core/event_local.h>
  4. #include <library/cpp/actors/protos/interconnect.pb.h>
  5. #include <util/generic/deque.h>
  6. #include <util/network/address.h>
  7. #include "interconnect_stream.h"
  8. #include "packet.h"
  9. #include "types.h"
  10. namespace NActors {
  11. struct TProgramInfo {
  12. ui64 PID = 0;
  13. ui64 StartTime = 0;
  14. ui64 Serial = 0;
  15. };
  16. enum class ENetwork : ui32 {
  17. ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
  18. // local messages
  19. ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
  20. Start = EventSpaceBegin(TEvents::ES_INTERCONNECT_TCP),
  21. SocketReadyRead = Start,
  22. SocketReadyWrite,
  23. SocketError,
  24. Connect,
  25. Disconnect,
  26. IncomingConnection,
  27. HandshakeAsk,
  28. HandshakeAck,
  29. HandshakeNak,
  30. HandshakeDone,
  31. HandshakeFail,
  32. Kick,
  33. Flush,
  34. NodeInfo,
  35. BunchOfEventsToDestroy,
  36. HandshakeRequest,
  37. HandshakeReplyOK,
  38. HandshakeReplyError,
  39. ResolveAddress,
  40. AddressInfo,
  41. ResolveError,
  42. HTTPStreamStatus,
  43. HTTPSendContent,
  44. ConnectProtocolWakeup,
  45. HTTPProtocolRetry,
  46. EvPollerRegister,
  47. EvPollerRegisterResult,
  48. EvPollerReady,
  49. EvUpdateFromInputSession,
  50. EvConfirmUpdate,
  51. EvSessionBufferSizeRequest,
  52. EvSessionBufferSizeResponse,
  53. EvProcessPingRequest,
  54. EvGetSecureSocket,
  55. EvSecureSocket,
  56. ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
  57. // nonlocal messages; their indices must be preserved in order to work properly while doing rolling update
  58. ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
  59. // interconnect load test message
  60. EvLoadMessage = Start + 256,
  61. };
  62. struct TEvSocketReadyRead: public TEventLocal<TEvSocketReadyRead, ui32(ENetwork::SocketReadyRead)> {
  63. DEFINE_SIMPLE_LOCAL_EVENT(TEvSocketReadyRead, "Network: TEvSocketReadyRead")
  64. };
  65. struct TEvSocketReadyWrite: public TEventLocal<TEvSocketReadyWrite, ui32(ENetwork::SocketReadyWrite)> {
  66. DEFINE_SIMPLE_LOCAL_EVENT(TEvSocketReadyWrite, "Network: TEvSocketReadyWrite")
  67. };
  68. struct TEvSocketError: public TEventLocal<TEvSocketError, ui32(ENetwork::SocketError)> {
  69. DEFINE_SIMPLE_LOCAL_EVENT(TEvSocketError, ::strerror(Error))
  70. TString GetReason() const {
  71. return ::strerror(Error);
  72. }
  73. const int Error;
  74. TIntrusivePtr<NInterconnect::TStreamSocket> Socket;
  75. TEvSocketError(int error, TIntrusivePtr<NInterconnect::TStreamSocket> sock)
  76. : Error(error)
  77. , Socket(std::move(sock))
  78. {
  79. }
  80. };
  81. struct TEvSocketConnect: public TEventLocal<TEvSocketConnect, ui32(ENetwork::Connect)> {
  82. DEFINE_SIMPLE_LOCAL_EVENT(TEvSocketConnect, "Network: TEvSocketConnect")
  83. };
  84. struct TEvSocketDisconnect: public TEventLocal<TEvSocketDisconnect, ui32(ENetwork::Disconnect)> {
  85. DEFINE_SIMPLE_LOCAL_EVENT(TEvSocketDisconnect, "Network: TEvSocketDisconnect")
  86. TDisconnectReason Reason;
  87. TEvSocketDisconnect(TDisconnectReason reason)
  88. : Reason(std::move(reason))
  89. {
  90. }
  91. };
  92. struct TEvHandshakeAsk: public TEventLocal<TEvHandshakeAsk, ui32(ENetwork::HandshakeAsk)> {
  93. DEFINE_SIMPLE_LOCAL_EVENT(TEvHandshakeAsk, "Network: TEvHandshakeAsk")
  94. TEvHandshakeAsk(const TActorId& self,
  95. const TActorId& peer,
  96. ui64 counter)
  97. : Self(self)
  98. , Peer(peer)
  99. , Counter(counter)
  100. {
  101. }
  102. const TActorId Self;
  103. const TActorId Peer;
  104. const ui64 Counter;
  105. };
  106. struct TEvHandshakeAck: public TEventLocal<TEvHandshakeAck, ui32(ENetwork::HandshakeAck)> {
  107. DEFINE_SIMPLE_LOCAL_EVENT(TEvHandshakeAck, "Network: TEvHandshakeAck")
  108. TEvHandshakeAck(const TActorId& self, ui64 nextPacket, TSessionParams params)
  109. : Self(self)
  110. , NextPacket(nextPacket)
  111. , Params(std::move(params))
  112. {}
  113. const TActorId Self;
  114. const ui64 NextPacket;
  115. const TSessionParams Params;
  116. };
  117. struct TEvHandshakeNak : TEventLocal<TEvHandshakeNak, ui32(ENetwork::HandshakeNak)> {
  118. DEFINE_SIMPLE_LOCAL_EVENT(TEvSocketReadyRead, "Network: TEvHandshakeNak")
  119. };
  120. struct TEvHandshakeRequest
  121. : public TEventLocal<TEvHandshakeRequest,
  122. ui32(ENetwork::HandshakeRequest)> {
  123. DEFINE_SIMPLE_LOCAL_EVENT(TEvHandshakeRequest,
  124. "Network: TEvHandshakeRequest")
  125. NActorsInterconnect::THandshakeRequest Record;
  126. };
  127. struct TEvHandshakeReplyOK
  128. : public TEventLocal<TEvHandshakeReplyOK,
  129. ui32(ENetwork::HandshakeReplyOK)> {
  130. DEFINE_SIMPLE_LOCAL_EVENT(TEvHandshakeReplyOK,
  131. "Network: TEvHandshakeReplyOK")
  132. NActorsInterconnect::THandshakeReply Record;
  133. };
  134. struct TEvHandshakeReplyError
  135. : public TEventLocal<TEvHandshakeReplyError,
  136. ui32(ENetwork::HandshakeReplyError)> {
  137. DEFINE_SIMPLE_LOCAL_EVENT(TEvHandshakeReplyError,
  138. "Network: TEvHandshakeReplyError")
  139. TEvHandshakeReplyError(TString error) {
  140. Record.SetErrorExplaination(error);
  141. }
  142. NActorsInterconnect::THandshakeReply Record;
  143. };
  144. struct TEvIncomingConnection: public TEventLocal<TEvIncomingConnection, ui32(ENetwork::IncomingConnection)> {
  145. DEFINE_SIMPLE_LOCAL_EVENT(TEvIncomingConnection, "Network: TEvIncomingConnection")
  146. TIntrusivePtr<NInterconnect::TStreamSocket> Socket;
  147. NInterconnect::TAddress Address;
  148. TEvIncomingConnection(TIntrusivePtr<NInterconnect::TStreamSocket> socket, NInterconnect::TAddress address)
  149. : Socket(std::move(socket))
  150. , Address(std::move(address))
  151. {}
  152. };
  153. struct TEvHandshakeDone: public TEventLocal<TEvHandshakeDone, ui32(ENetwork::HandshakeDone)> {
  154. DEFINE_SIMPLE_LOCAL_EVENT(TEvHandshakeDone, "Network: TEvHandshakeDone")
  155. TEvHandshakeDone(
  156. TIntrusivePtr<NInterconnect::TStreamSocket> socket,
  157. const TActorId& peer,
  158. const TActorId& self,
  159. ui64 nextPacket,
  160. TAutoPtr<TProgramInfo>&& programInfo,
  161. TSessionParams params)
  162. : Socket(std::move(socket))
  163. , Peer(peer)
  164. , Self(self)
  165. , NextPacket(nextPacket)
  166. , ProgramInfo(std::move(programInfo))
  167. , Params(std::move(params))
  168. {
  169. }
  170. TIntrusivePtr<NInterconnect::TStreamSocket> Socket;
  171. const TActorId Peer;
  172. const TActorId Self;
  173. const ui64 NextPacket;
  174. TAutoPtr<TProgramInfo> ProgramInfo;
  175. const TSessionParams Params;
  176. };
  177. struct TEvHandshakeFail: public TEventLocal<TEvHandshakeFail, ui32(ENetwork::HandshakeFail)> {
  178. DEFINE_SIMPLE_LOCAL_EVENT(TEvHandshakeFail, "Network: TEvHandshakeFail")
  179. enum EnumHandshakeFail {
  180. HANDSHAKE_FAIL_TRANSIENT,
  181. HANDSHAKE_FAIL_PERMANENT,
  182. HANDSHAKE_FAIL_SESSION_MISMATCH,
  183. };
  184. TEvHandshakeFail(EnumHandshakeFail temporary, TString explanation)
  185. : Temporary(temporary)
  186. , Explanation(std::move(explanation))
  187. {
  188. }
  189. const EnumHandshakeFail Temporary;
  190. const TString Explanation;
  191. };
  192. struct TEvKick: public TEventLocal<TEvKick, ui32(ENetwork::Kick)> {
  193. DEFINE_SIMPLE_LOCAL_EVENT(TEvKick, "Network: TEvKick")
  194. };
  195. struct TEvFlush: public TEventLocal<TEvFlush, ui32(ENetwork::Flush)> {
  196. DEFINE_SIMPLE_LOCAL_EVENT(TEvFlush, "Network: TEvFlush")
  197. };
  198. struct TEvLocalNodeInfo
  199. : public TEventLocal<TEvLocalNodeInfo, ui32(ENetwork::NodeInfo)> {
  200. DEFINE_SIMPLE_LOCAL_EVENT(TEvLocalNodeInfo, "Network: TEvLocalNodeInfo")
  201. ui32 NodeId;
  202. NAddr::IRemoteAddrPtr Address;
  203. };
  204. struct TEvBunchOfEventsToDestroy : TEventLocal<TEvBunchOfEventsToDestroy, ui32(ENetwork::BunchOfEventsToDestroy)> {
  205. DEFINE_SIMPLE_LOCAL_EVENT(TEvBunchOfEventsToDestroy,
  206. "Network: TEvBunchOfEventsToDestroy")
  207. TEvBunchOfEventsToDestroy(TDeque<TAutoPtr<IEventBase>> events)
  208. : Events(std::move(events))
  209. {
  210. }
  211. TDeque<TAutoPtr<IEventBase>> Events;
  212. };
  213. struct TEvResolveAddress
  214. : public TEventLocal<TEvResolveAddress, ui32(ENetwork::ResolveAddress)> {
  215. DEFINE_SIMPLE_LOCAL_EVENT(TEvResolveAddress, "Network: TEvResolveAddress")
  216. TString Address;
  217. ui16 Port;
  218. };
  219. struct TEvAddressInfo
  220. : public TEventLocal<TEvAddressInfo, ui32(ENetwork::AddressInfo)> {
  221. DEFINE_SIMPLE_LOCAL_EVENT(TEvAddressInfo, "Network: TEvAddressInfo")
  222. NAddr::IRemoteAddrPtr Address;
  223. };
  224. struct TEvResolveError
  225. : public TEventLocal<TEvResolveError, ui32(ENetwork::ResolveError)> {
  226. DEFINE_SIMPLE_LOCAL_EVENT(TEvResolveError, "Network: TEvResolveError")
  227. TString Explain;
  228. };
  229. struct TEvHTTPStreamStatus
  230. : public TEventLocal<TEvHTTPStreamStatus, ui32(ENetwork::HTTPStreamStatus)> {
  231. DEFINE_SIMPLE_LOCAL_EVENT(TEvHTTPStreamStatus,
  232. "Network: TEvHTTPStreamStatus")
  233. enum EStatus {
  234. READY,
  235. COMPLETE,
  236. ERROR,
  237. };
  238. EStatus Status;
  239. TString Error;
  240. TString HttpHeaders;
  241. };
  242. struct TEvHTTPSendContent
  243. : public TEventLocal<TEvHTTPSendContent, ui32(ENetwork::HTTPSendContent)> {
  244. DEFINE_SIMPLE_LOCAL_EVENT(TEvHTTPSendContent, "Network: TEvHTTPSendContent")
  245. const char* Data;
  246. size_t Len;
  247. bool Last;
  248. };
  249. struct TEvConnectWakeup
  250. : public TEventLocal<TEvConnectWakeup,
  251. ui32(ENetwork::ConnectProtocolWakeup)> {
  252. DEFINE_SIMPLE_LOCAL_EVENT(TEvConnectWakeup, "Protocols: TEvConnectWakeup")
  253. };
  254. struct TEvHTTPProtocolRetry
  255. : public TEventLocal<TEvHTTPProtocolRetry,
  256. ui32(ENetwork::HTTPProtocolRetry)> {
  257. DEFINE_SIMPLE_LOCAL_EVENT(TEvHTTPProtocolRetry,
  258. "Protocols: TEvHTTPProtocolRetry")
  259. };
  260. struct TEvLoadMessage
  261. : TEventPB<TEvLoadMessage, NActorsInterconnect::TEvLoadMessage, static_cast<ui32>(ENetwork::EvLoadMessage)> {
  262. TEvLoadMessage() = default;
  263. template <typename TContainer>
  264. TEvLoadMessage(const TContainer& route, const TString& id, const TString* payload) {
  265. for (const TActorId& actorId : route) {
  266. auto* hop = Record.AddHops();
  267. if (actorId) {
  268. ActorIdToProto(actorId, hop->MutableNextHop());
  269. }
  270. }
  271. Record.SetId(id);
  272. if (payload) {
  273. Record.SetPayload(*payload);
  274. }
  275. }
  276. template <typename TContainer>
  277. TEvLoadMessage(const TContainer& route, const TString& id, TRope&& payload) {
  278. for (const TActorId& actorId : route) {
  279. auto* hop = Record.AddHops();
  280. if (actorId) {
  281. ActorIdToProto(actorId, hop->MutableNextHop());
  282. }
  283. }
  284. Record.SetId(id);
  285. AddPayload(std::move(payload));
  286. }
  287. };
  288. struct TEvUpdateFromInputSession : TEventLocal<TEvUpdateFromInputSession, static_cast<ui32>(ENetwork::EvUpdateFromInputSession)> {
  289. ui64 ConfirmedByInput; // latest Confirm value from processed input packet
  290. ui64 NumDataBytes;
  291. TDuration Ping;
  292. TEvUpdateFromInputSession(ui64 confirmedByInput, ui64 numDataBytes, TDuration ping)
  293. : ConfirmedByInput(confirmedByInput)
  294. , NumDataBytes(numDataBytes)
  295. , Ping(ping)
  296. {
  297. }
  298. };
  299. struct TEvConfirmUpdate : TEventLocal<TEvConfirmUpdate, static_cast<ui32>(ENetwork::EvConfirmUpdate)>
  300. {};
  301. struct TEvSessionBufferSizeRequest : TEventLocal<TEvSessionBufferSizeRequest, static_cast<ui32>(ENetwork::EvSessionBufferSizeRequest)> {
  302. //DEFINE_SIMPLE_LOCAL_EVENT(TEvSessionBufferSizeRequest, "Session: TEvSessionBufferSizeRequest")
  303. DEFINE_SIMPLE_LOCAL_EVENT(TEvSessionBufferSizeRequest, "Network: TEvSessionBufferSizeRequest");
  304. };
  305. struct TEvSessionBufferSizeResponse : TEventLocal<TEvSessionBufferSizeResponse, static_cast<ui32>(ENetwork::EvSessionBufferSizeResponse)> {
  306. TEvSessionBufferSizeResponse(const TActorId& sessionId, ui64 outputBufferSize)
  307. : SessionID(sessionId)
  308. , BufferSize(outputBufferSize)
  309. {
  310. }
  311. TActorId SessionID;
  312. ui64 BufferSize;
  313. };
  314. struct TEvProcessPingRequest : TEventLocal<TEvProcessPingRequest, static_cast<ui32>(ENetwork::EvProcessPingRequest)> {
  315. const ui64 Payload;
  316. TEvProcessPingRequest(ui64 payload)
  317. : Payload(payload)
  318. {}
  319. };
  320. struct TEvGetSecureSocket : TEventLocal<TEvGetSecureSocket, (ui32)ENetwork::EvGetSecureSocket> {
  321. TIntrusivePtr<NInterconnect::TStreamSocket> Socket;
  322. TEvGetSecureSocket(TIntrusivePtr<NInterconnect::TStreamSocket> socket)
  323. : Socket(std::move(socket))
  324. {}
  325. };
  326. struct TEvSecureSocket : TEventLocal<TEvSecureSocket, (ui32)ENetwork::EvSecureSocket> {
  327. TIntrusivePtr<NInterconnect::TSecureSocket> Socket;
  328. TEvSecureSocket(TIntrusivePtr<NInterconnect::TSecureSocket> socket)
  329. : Socket(std::move(socket))
  330. {}
  331. };
  332. }