123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403 |
- #pragma once
- #include <library/cpp/actors/core/events.h>
- #include <library/cpp/actors/core/event_local.h>
- #include <library/cpp/actors/protos/interconnect.pb.h>
- #include <util/generic/deque.h>
- #include <util/network/address.h>
- #include "interconnect_stream.h"
- #include "packet.h"
- #include "types.h"
- namespace NActors {
- struct TProgramInfo {
- ui64 PID = 0;
- ui64 StartTime = 0;
- ui64 Serial = 0;
- };
- enum class ENetwork : ui32 {
- ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
- // local messages
- ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
- Start = EventSpaceBegin(TEvents::ES_INTERCONNECT_TCP),
- SocketReadyRead = Start,
- SocketReadyWrite,
- SocketError,
- Connect,
- Disconnect,
- IncomingConnection,
- HandshakeAsk,
- HandshakeAck,
- HandshakeNak,
- HandshakeDone,
- HandshakeFail,
- Kick,
- Flush,
- NodeInfo,
- BunchOfEventsToDestroy,
- HandshakeRequest,
- HandshakeReplyOK,
- HandshakeReplyError,
- ResolveAddress,
- AddressInfo,
- ResolveError,
- HTTPStreamStatus,
- HTTPSendContent,
- ConnectProtocolWakeup,
- HTTPProtocolRetry,
- EvPollerRegister,
- EvPollerRegisterResult,
- EvPollerReady,
- EvUpdateFromInputSession,
- EvConfirmUpdate,
- EvSessionBufferSizeRequest,
- EvSessionBufferSizeResponse,
- EvProcessPingRequest,
- EvGetSecureSocket,
- EvSecureSocket,
- ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
- // nonlocal messages; their indices must be preserved in order to work properly while doing rolling update
- ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
- // interconnect load test message
- EvLoadMessage = Start + 256,
- };
- struct TEvSocketReadyRead: public TEventLocal<TEvSocketReadyRead, ui32(ENetwork::SocketReadyRead)> {
- DEFINE_SIMPLE_LOCAL_EVENT(TEvSocketReadyRead, "Network: TEvSocketReadyRead")
- };
- struct TEvSocketReadyWrite: public TEventLocal<TEvSocketReadyWrite, ui32(ENetwork::SocketReadyWrite)> {
- DEFINE_SIMPLE_LOCAL_EVENT(TEvSocketReadyWrite, "Network: TEvSocketReadyWrite")
- };
- struct TEvSocketError: public TEventLocal<TEvSocketError, ui32(ENetwork::SocketError)> {
- DEFINE_SIMPLE_LOCAL_EVENT(TEvSocketError, ::strerror(Error))
- TString GetReason() const {
- return ::strerror(Error);
- }
- const int Error;
- TIntrusivePtr<NInterconnect::TStreamSocket> Socket;
- TEvSocketError(int error, TIntrusivePtr<NInterconnect::TStreamSocket> sock)
- : Error(error)
- , Socket(std::move(sock))
- {
- }
- };
- struct TEvSocketConnect: public TEventLocal<TEvSocketConnect, ui32(ENetwork::Connect)> {
- DEFINE_SIMPLE_LOCAL_EVENT(TEvSocketConnect, "Network: TEvSocketConnect")
- };
- struct TEvSocketDisconnect: public TEventLocal<TEvSocketDisconnect, ui32(ENetwork::Disconnect)> {
- DEFINE_SIMPLE_LOCAL_EVENT(TEvSocketDisconnect, "Network: TEvSocketDisconnect")
- TDisconnectReason Reason;
- TEvSocketDisconnect(TDisconnectReason reason)
- : Reason(std::move(reason))
- {
- }
- };
- struct TEvHandshakeAsk: public TEventLocal<TEvHandshakeAsk, ui32(ENetwork::HandshakeAsk)> {
- DEFINE_SIMPLE_LOCAL_EVENT(TEvHandshakeAsk, "Network: TEvHandshakeAsk")
- TEvHandshakeAsk(const TActorId& self,
- const TActorId& peer,
- ui64 counter)
- : Self(self)
- , Peer(peer)
- , Counter(counter)
- {
- }
- const TActorId Self;
- const TActorId Peer;
- const ui64 Counter;
- };
- struct TEvHandshakeAck: public TEventLocal<TEvHandshakeAck, ui32(ENetwork::HandshakeAck)> {
- DEFINE_SIMPLE_LOCAL_EVENT(TEvHandshakeAck, "Network: TEvHandshakeAck")
- TEvHandshakeAck(const TActorId& self, ui64 nextPacket, TSessionParams params)
- : Self(self)
- , NextPacket(nextPacket)
- , Params(std::move(params))
- {}
- const TActorId Self;
- const ui64 NextPacket;
- const TSessionParams Params;
- };
- struct TEvHandshakeNak : TEventLocal<TEvHandshakeNak, ui32(ENetwork::HandshakeNak)> {
- DEFINE_SIMPLE_LOCAL_EVENT(TEvSocketReadyRead, "Network: TEvHandshakeNak")
- };
- struct TEvHandshakeRequest
- : public TEventLocal<TEvHandshakeRequest,
- ui32(ENetwork::HandshakeRequest)> {
- DEFINE_SIMPLE_LOCAL_EVENT(TEvHandshakeRequest,
- "Network: TEvHandshakeRequest")
- NActorsInterconnect::THandshakeRequest Record;
- };
- struct TEvHandshakeReplyOK
- : public TEventLocal<TEvHandshakeReplyOK,
- ui32(ENetwork::HandshakeReplyOK)> {
- DEFINE_SIMPLE_LOCAL_EVENT(TEvHandshakeReplyOK,
- "Network: TEvHandshakeReplyOK")
- NActorsInterconnect::THandshakeReply Record;
- };
- struct TEvHandshakeReplyError
- : public TEventLocal<TEvHandshakeReplyError,
- ui32(ENetwork::HandshakeReplyError)> {
- DEFINE_SIMPLE_LOCAL_EVENT(TEvHandshakeReplyError,
- "Network: TEvHandshakeReplyError")
- TEvHandshakeReplyError(TString error) {
- Record.SetErrorExplaination(error);
- }
- NActorsInterconnect::THandshakeReply Record;
- };
- struct TEvIncomingConnection: public TEventLocal<TEvIncomingConnection, ui32(ENetwork::IncomingConnection)> {
- DEFINE_SIMPLE_LOCAL_EVENT(TEvIncomingConnection, "Network: TEvIncomingConnection")
- TIntrusivePtr<NInterconnect::TStreamSocket> Socket;
- NInterconnect::TAddress Address;
- TEvIncomingConnection(TIntrusivePtr<NInterconnect::TStreamSocket> socket, NInterconnect::TAddress address)
- : Socket(std::move(socket))
- , Address(std::move(address))
- {}
- };
- struct TEvHandshakeDone: public TEventLocal<TEvHandshakeDone, ui32(ENetwork::HandshakeDone)> {
- DEFINE_SIMPLE_LOCAL_EVENT(TEvHandshakeDone, "Network: TEvHandshakeDone")
- TEvHandshakeDone(
- TIntrusivePtr<NInterconnect::TStreamSocket> socket,
- const TActorId& peer,
- const TActorId& self,
- ui64 nextPacket,
- TAutoPtr<TProgramInfo>&& programInfo,
- TSessionParams params)
- : Socket(std::move(socket))
- , Peer(peer)
- , Self(self)
- , NextPacket(nextPacket)
- , ProgramInfo(std::move(programInfo))
- , Params(std::move(params))
- {
- }
- TIntrusivePtr<NInterconnect::TStreamSocket> Socket;
- const TActorId Peer;
- const TActorId Self;
- const ui64 NextPacket;
- TAutoPtr<TProgramInfo> ProgramInfo;
- const TSessionParams Params;
- };
- struct TEvHandshakeFail: public TEventLocal<TEvHandshakeFail, ui32(ENetwork::HandshakeFail)> {
- DEFINE_SIMPLE_LOCAL_EVENT(TEvHandshakeFail, "Network: TEvHandshakeFail")
- enum EnumHandshakeFail {
- HANDSHAKE_FAIL_TRANSIENT,
- HANDSHAKE_FAIL_PERMANENT,
- HANDSHAKE_FAIL_SESSION_MISMATCH,
- };
- TEvHandshakeFail(EnumHandshakeFail temporary, TString explanation)
- : Temporary(temporary)
- , Explanation(std::move(explanation))
- {
- }
- const EnumHandshakeFail Temporary;
- const TString Explanation;
- };
- struct TEvKick: public TEventLocal<TEvKick, ui32(ENetwork::Kick)> {
- DEFINE_SIMPLE_LOCAL_EVENT(TEvKick, "Network: TEvKick")
- };
- struct TEvFlush: public TEventLocal<TEvFlush, ui32(ENetwork::Flush)> {
- DEFINE_SIMPLE_LOCAL_EVENT(TEvFlush, "Network: TEvFlush")
- };
- struct TEvLocalNodeInfo
- : public TEventLocal<TEvLocalNodeInfo, ui32(ENetwork::NodeInfo)> {
- DEFINE_SIMPLE_LOCAL_EVENT(TEvLocalNodeInfo, "Network: TEvLocalNodeInfo")
- ui32 NodeId;
- NAddr::IRemoteAddrPtr Address;
- };
- struct TEvBunchOfEventsToDestroy : TEventLocal<TEvBunchOfEventsToDestroy, ui32(ENetwork::BunchOfEventsToDestroy)> {
- DEFINE_SIMPLE_LOCAL_EVENT(TEvBunchOfEventsToDestroy,
- "Network: TEvBunchOfEventsToDestroy")
- TEvBunchOfEventsToDestroy(TDeque<TAutoPtr<IEventBase>> events)
- : Events(std::move(events))
- {
- }
- TDeque<TAutoPtr<IEventBase>> Events;
- };
- struct TEvResolveAddress
- : public TEventLocal<TEvResolveAddress, ui32(ENetwork::ResolveAddress)> {
- DEFINE_SIMPLE_LOCAL_EVENT(TEvResolveAddress, "Network: TEvResolveAddress")
- TString Address;
- ui16 Port;
- };
- struct TEvAddressInfo
- : public TEventLocal<TEvAddressInfo, ui32(ENetwork::AddressInfo)> {
- DEFINE_SIMPLE_LOCAL_EVENT(TEvAddressInfo, "Network: TEvAddressInfo")
- NAddr::IRemoteAddrPtr Address;
- };
- struct TEvResolveError
- : public TEventLocal<TEvResolveError, ui32(ENetwork::ResolveError)> {
- DEFINE_SIMPLE_LOCAL_EVENT(TEvResolveError, "Network: TEvResolveError")
- TString Explain;
- };
- struct TEvHTTPStreamStatus
- : public TEventLocal<TEvHTTPStreamStatus, ui32(ENetwork::HTTPStreamStatus)> {
- DEFINE_SIMPLE_LOCAL_EVENT(TEvHTTPStreamStatus,
- "Network: TEvHTTPStreamStatus")
- enum EStatus {
- READY,
- COMPLETE,
- ERROR,
- };
- EStatus Status;
- TString Error;
- TString HttpHeaders;
- };
- struct TEvHTTPSendContent
- : public TEventLocal<TEvHTTPSendContent, ui32(ENetwork::HTTPSendContent)> {
- DEFINE_SIMPLE_LOCAL_EVENT(TEvHTTPSendContent, "Network: TEvHTTPSendContent")
- const char* Data;
- size_t Len;
- bool Last;
- };
- struct TEvConnectWakeup
- : public TEventLocal<TEvConnectWakeup,
- ui32(ENetwork::ConnectProtocolWakeup)> {
- DEFINE_SIMPLE_LOCAL_EVENT(TEvConnectWakeup, "Protocols: TEvConnectWakeup")
- };
- struct TEvHTTPProtocolRetry
- : public TEventLocal<TEvHTTPProtocolRetry,
- ui32(ENetwork::HTTPProtocolRetry)> {
- DEFINE_SIMPLE_LOCAL_EVENT(TEvHTTPProtocolRetry,
- "Protocols: TEvHTTPProtocolRetry")
- };
- struct TEvLoadMessage
- : TEventPB<TEvLoadMessage, NActorsInterconnect::TEvLoadMessage, static_cast<ui32>(ENetwork::EvLoadMessage)> {
- TEvLoadMessage() = default;
- template <typename TContainer>
- TEvLoadMessage(const TContainer& route, const TString& id, const TString* payload) {
- for (const TActorId& actorId : route) {
- auto* hop = Record.AddHops();
- if (actorId) {
- ActorIdToProto(actorId, hop->MutableNextHop());
- }
- }
- Record.SetId(id);
- if (payload) {
- Record.SetPayload(*payload);
- }
- }
- template <typename TContainer>
- TEvLoadMessage(const TContainer& route, const TString& id, TRope&& payload) {
- for (const TActorId& actorId : route) {
- auto* hop = Record.AddHops();
- if (actorId) {
- ActorIdToProto(actorId, hop->MutableNextHop());
- }
- }
- Record.SetId(id);
- AddPayload(std::move(payload));
- }
- };
- struct TEvUpdateFromInputSession : TEventLocal<TEvUpdateFromInputSession, static_cast<ui32>(ENetwork::EvUpdateFromInputSession)> {
- ui64 ConfirmedByInput; // latest Confirm value from processed input packet
- ui64 NumDataBytes;
- TDuration Ping;
- TEvUpdateFromInputSession(ui64 confirmedByInput, ui64 numDataBytes, TDuration ping)
- : ConfirmedByInput(confirmedByInput)
- , NumDataBytes(numDataBytes)
- , Ping(ping)
- {
- }
- };
- struct TEvConfirmUpdate : TEventLocal<TEvConfirmUpdate, static_cast<ui32>(ENetwork::EvConfirmUpdate)>
- {};
- struct TEvSessionBufferSizeRequest : TEventLocal<TEvSessionBufferSizeRequest, static_cast<ui32>(ENetwork::EvSessionBufferSizeRequest)> {
- //DEFINE_SIMPLE_LOCAL_EVENT(TEvSessionBufferSizeRequest, "Session: TEvSessionBufferSizeRequest")
- DEFINE_SIMPLE_LOCAL_EVENT(TEvSessionBufferSizeRequest, "Network: TEvSessionBufferSizeRequest");
- };
- struct TEvSessionBufferSizeResponse : TEventLocal<TEvSessionBufferSizeResponse, static_cast<ui32>(ENetwork::EvSessionBufferSizeResponse)> {
- TEvSessionBufferSizeResponse(const TActorId& sessionId, ui64 outputBufferSize)
- : SessionID(sessionId)
- , BufferSize(outputBufferSize)
- {
- }
- TActorId SessionID;
- ui64 BufferSize;
- };
- struct TEvProcessPingRequest : TEventLocal<TEvProcessPingRequest, static_cast<ui32>(ENetwork::EvProcessPingRequest)> {
- const ui64 Payload;
- TEvProcessPingRequest(ui64 payload)
- : Payload(payload)
- {}
- };
- struct TEvGetSecureSocket : TEventLocal<TEvGetSecureSocket, (ui32)ENetwork::EvGetSecureSocket> {
- TIntrusivePtr<NInterconnect::TStreamSocket> Socket;
- TEvGetSecureSocket(TIntrusivePtr<NInterconnect::TStreamSocket> socket)
- : Socket(std::move(socket))
- {}
- };
- struct TEvSecureSocket : TEventLocal<TEvSecureSocket, (ui32)ENetwork::EvSecureSocket> {
- TIntrusivePtr<NInterconnect::TSecureSocket> Socket;
- TEvSecureSocket(TIntrusivePtr<NInterconnect::TSecureSocket> socket)
- : Socket(std::move(socket))
- {}
- };
- }
|