#pragma once #include "acceptor_status.h" #include "async_result.h" #include "event_loop.h" #include "netaddr.h" #include "remote_connection.h" #include "remote_connection_status.h" #include "session_job_count.h" #include "shutdown_state.h" #include "ybus.h" #include #include #include #include #include #include namespace NBus { namespace NPrivate { typedef TIntrusivePtr TRemoteClientConnectionPtr; typedef TIntrusivePtr TRemoteServerConnectionPtr; typedef TIntrusivePtr TRemoteServerSessionPtr; typedef TIntrusivePtr TAcceptorPtr; typedef TVector TAcceptorsPtrs; struct TConnectionsAcceptorsSnapshot { TVector Connections; TVector Acceptors; ui64 LastConnectionId; ui64 LastAcceptorId; TConnectionsAcceptorsSnapshot(); }; typedef TAtomicSharedPtr TConnectionsAcceptorsSnapshotPtr; struct TOnAccept { SOCKET s; TNetAddr addr; TInstant now; }; struct TStatusTag {}; struct TConnectionTag {}; struct TDeadConnectionTag {}; struct TRemoveTag {}; struct TBusSessionImpl : public virtual TBusSession, private ::NActor::TActor, private ::NActor::TActor , private ::NActor::TQueueInActor, private ::NActor::TQueueInActor, private ::NActor::TQueueInActor , private ::NActor::TQueueInActor, private ::NActor::TQueueInActor { friend class TAcceptor; friend class TRemoteConnection; friend class TRemoteServerConnection; friend class ::NActor::TActor; friend class ::NActor::TActor; friend class ::NActor::TQueueInActor; friend class ::NActor::TQueueInActor; friend class ::NActor::TQueueInActor; friend class ::NActor::TQueueInActor; friend class ::NActor::TQueueInActor; public: ::NActor::TQueueInActor* GetOnAcceptQueue() { return this; } ::NActor::TQueueInActor* GetRemoveConnectionQueue() { return this; } ::NActor::TActor* GetConnectionActor() { return this; } typedef TGuard TConnectionsGuard; TBusSessionImpl(bool isSource, TBusMessageQueue* queue, TBusProtocol* proto, IBusErrorHandler* handler, const TBusSessionConfig& config, const TString& name); ~TBusSessionImpl() override; void Shutdown() override; bool IsDown(); size_t GetInFlightImpl(const TNetAddr& addr) const; size_t GetConnectSyscallsNumForTestImpl(const TNetAddr& addr) const; void GetInFlightBulk(TArrayRef addrs, TArrayRef results) const override; void GetConnectSyscallsNumBulkForTest(TArrayRef addrs, TArrayRef results) const override; virtual void FillStatus(); TSessionDumpStatus GetStatusRecordInternal() override; TString GetStatus(ui16 flags = YBUS_STATUS_CONNS) override; TConnectionStatusMonRecord GetStatusProtobuf() override; TString GetStatusSingleLine() override; void ProcessItem(TStatusTag, TDeadConnectionTag, const TRemoteConnectionWriterIncrementalStatus&); void ProcessItem(TStatusTag, TDeadConnectionTag, const TRemoteConnectionReaderIncrementalStatus&); void ProcessItem(TStatusTag, TDeadConnectionTag, const TAcceptorStatus&); void ProcessItem(TStatusTag, ::NActor::TDefaultTag, const TAcceptorStatus&); void ProcessItem(TConnectionTag, ::NActor::TDefaultTag, const TOnAccept&); void ProcessItem(TConnectionTag, TRemoveTag, TRemoteConnectionPtr); void ProcessConnectionsAcceptorsShapshotQueueItem(TAtomicSharedPtr); void StatusUpdateCachedDump(); void StatusUpdateCachedDumpIfNecessary(TInstant now); void Act(TStatusTag); void Act(TConnectionTag); TBusProtocol* GetProto() const noexcept override; const TBusSessionConfig* GetConfig() const noexcept override; TBusMessageQueue* GetQueue() const noexcept override; TString GetNameInternal() override; virtual void OnMessageReceived(TRemoteConnection* c, TVectorSwaps& newMsg) = 0; void Listen(int port, TBusMessageQueue* q); void Listen(const TVector& bindTo, TBusMessageQueue* q); TBusConnection* Accept(SOCKET listen); inline ::NActor::TActor* GetStatusActor() { return this; } inline ::NActor::TActor* GetConnectionsActor() { return this; } typedef THashMap TAddrRemoteConnections; void SendSnapshotToStatusActor(); void InsertConnectionLockAcquired(TRemoteConnection* connection); void InsertAcceptorLockAcquired(TAcceptor* acceptor); void GetConnections(TVector*); void GetAcceptors(TVector*); void GetConnectionsLockAquired(TVector*); void GetAcceptorsLockAquired(TVector*); TRemoteConnectionPtr GetConnection(const TBusSocketAddr& addr, bool create); TRemoteConnectionPtr GetConnectionById(ui64 id); TAcceptorPtr GetAcceptorById(ui64 id); void InvokeOnError(TNonDestroyingAutoPtr, EMessageStatus); void Cron(); TBusSessionJobCount JobCount; // TODO: replace with actor TMutex ConnectionsLock; struct TImpl; THolder Impl; const bool IsSource_; TBusMessageQueue* const Queue; TBusProtocol* const Proto; // copied to be available after Proto dies const TString ProtoName; IBusErrorHandler* const ErrorHandler; TUseCountHolder HandlerUseCountHolder; TBusSessionConfig Config; // TODO: make const NEventLoop::TEventLoop WriteEventLoop; NEventLoop::TEventLoop ReadEventLoop; THolder> ReadEventLoopThread; THolder> WriteEventLoopThread; THashMap ConnectionsById; TAddrRemoteConnections Connections; TAcceptorsPtrs Acceptors; struct TStatusData { TAtomicSharedPtr ConnectionsAcceptorsSnapshot; ::NActor::TQueueForActor> ConnectionsAcceptorsSnapshotsQueue; TAtomicShutdownState ShutdownState; TBusSessionStatus Status; TSessionDumpStatus StatusDumpCached; TMutex StatusDumpCachedMutex; TInstant StatusDumpCachedLastUpdate; TStatusData(); }; TStatusData StatusData; struct TConnectionsData { TAtomicShutdownState ShutdownState; TConnectionsData(); }; TConnectionsData ConnectionsData; ::NActor::TQueueInActor* GetDeadConnectionWriterStatusQueue() { return this; } ::NActor::TQueueInActor* GetDeadConnectionReaderStatusQueue() { return this; } ::NActor::TQueueInActor* GetDeadAcceptorStatusQueue() { return this; } template ::NActor::IQueueInActor* GetQueue() { return this; } ui64 LastAcceptorId; ui64 LastConnectionId; TAtomic Down; TSystemEvent ShutdownCompleteEvent; }; inline TBusProtocol* TBusSessionImpl::GetProto() const noexcept { return Proto; } inline const TBusSessionConfig* TBusSessionImpl::GetConfig() const noexcept { return &Config; } inline TBusMessageQueue* TBusSessionImpl::GetQueue() const noexcept { return Queue; } } }