123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259 |
- #pragma once
- #include "acceptor_status.h"
- #include "async_result.h"
- #include "event_loop.h"
- #include "netaddr.h"
- #include "remote_connection.h"
- #include "remote_connection_status.h"
- #include "session_job_count.h"
- #include "shutdown_state.h"
- #include "ybus.h"
- #include <library/cpp/messagebus/actor/actor.h>
- #include <library/cpp/messagebus/actor/queue_in_actor.h>
- #include <library/cpp/messagebus/monitoring/mon_proto.pb.h>
- #include <library/cpp/threading/future/legacy_future.h>
- #include <util/generic/array_ref.h>
- #include <util/generic/string.h>
- namespace NBus {
- namespace NPrivate {
- typedef TIntrusivePtr<TRemoteClientConnection> TRemoteClientConnectionPtr;
- typedef TIntrusivePtr<TRemoteServerConnection> TRemoteServerConnectionPtr;
- typedef TIntrusivePtr<TRemoteServerSession> TRemoteServerSessionPtr;
- typedef TIntrusivePtr<TAcceptor> TAcceptorPtr;
- typedef TVector<TAcceptorPtr> TAcceptorsPtrs;
- struct TConnectionsAcceptorsSnapshot {
- TVector<TRemoteConnectionPtr> Connections;
- TVector<TAcceptorPtr> Acceptors;
- ui64 LastConnectionId;
- ui64 LastAcceptorId;
- TConnectionsAcceptorsSnapshot();
- };
- typedef TAtomicSharedPtr<TConnectionsAcceptorsSnapshot> TConnectionsAcceptorsSnapshotPtr;
- struct TOnAccept {
- SOCKET s;
- TNetAddr addr;
- TInstant now;
- };
- struct TStatusTag {};
- struct TConnectionTag {};
- struct TDeadConnectionTag {};
- struct TRemoveTag {};
- struct TBusSessionImpl
- : public virtual TBusSession,
- private ::NActor::TActor<TBusSessionImpl, TStatusTag>,
- private ::NActor::TActor<TBusSessionImpl, TConnectionTag>
- ,
- private ::NActor::TQueueInActor<TBusSessionImpl, TRemoteConnectionWriterIncrementalStatus, TStatusTag, TDeadConnectionTag>,
- private ::NActor::TQueueInActor<TBusSessionImpl, TRemoteConnectionReaderIncrementalStatus, TStatusTag, TDeadConnectionTag>,
- private ::NActor::TQueueInActor<TBusSessionImpl, TAcceptorStatus, TStatusTag, TDeadConnectionTag>
- ,
- private ::NActor::TQueueInActor<TBusSessionImpl, TOnAccept, TConnectionTag>,
- private ::NActor::TQueueInActor<TBusSessionImpl, TRemoteConnectionPtr, TConnectionTag, TRemoveTag> {
- friend class TAcceptor;
- friend class TRemoteConnection;
- friend class TRemoteServerConnection;
- friend class ::NActor::TActor<TBusSessionImpl, TStatusTag>;
- friend class ::NActor::TActor<TBusSessionImpl, TConnectionTag>;
- friend class ::NActor::TQueueInActor<TBusSessionImpl, TRemoteConnectionWriterIncrementalStatus, TStatusTag, TDeadConnectionTag>;
- friend class ::NActor::TQueueInActor<TBusSessionImpl, TRemoteConnectionReaderIncrementalStatus, TStatusTag, TDeadConnectionTag>;
- friend class ::NActor::TQueueInActor<TBusSessionImpl, TAcceptorStatus, TStatusTag, TDeadConnectionTag>;
- friend class ::NActor::TQueueInActor<TBusSessionImpl, TOnAccept, TConnectionTag>;
- friend class ::NActor::TQueueInActor<TBusSessionImpl, TRemoteConnectionPtr, TConnectionTag, TRemoveTag>;
- public:
- ::NActor::TQueueInActor<TBusSessionImpl, TOnAccept, TConnectionTag>* GetOnAcceptQueue() {
- return this;
- }
- ::NActor::TQueueInActor<TBusSessionImpl, TRemoteConnectionPtr, TConnectionTag, TRemoveTag>* GetRemoveConnectionQueue() {
- return this;
- }
- ::NActor::TActor<TBusSessionImpl, TConnectionTag>* GetConnectionActor() {
- return this;
- }
- typedef TGuard<TMutex> TConnectionsGuard;
- TBusSessionImpl(bool isSource, TBusMessageQueue* queue, TBusProtocol* proto,
- IBusErrorHandler* handler,
- const TBusSessionConfig& config, const TString& name);
- ~TBusSessionImpl() override;
- void Shutdown() override;
- bool IsDown();
- size_t GetInFlightImpl(const TNetAddr& addr) const;
- size_t GetConnectSyscallsNumForTestImpl(const TNetAddr& addr) const;
- void GetInFlightBulk(TArrayRef<const TNetAddr> addrs, TArrayRef<size_t> results) const override;
- void GetConnectSyscallsNumBulkForTest(TArrayRef<const TNetAddr> addrs, TArrayRef<size_t> results) const override;
- virtual void FillStatus();
- TSessionDumpStatus GetStatusRecordInternal() override;
- TString GetStatus(ui16 flags = YBUS_STATUS_CONNS) override;
- TConnectionStatusMonRecord GetStatusProtobuf() override;
- TString GetStatusSingleLine() override;
- void ProcessItem(TStatusTag, TDeadConnectionTag, const TRemoteConnectionWriterIncrementalStatus&);
- void ProcessItem(TStatusTag, TDeadConnectionTag, const TRemoteConnectionReaderIncrementalStatus&);
- void ProcessItem(TStatusTag, TDeadConnectionTag, const TAcceptorStatus&);
- void ProcessItem(TStatusTag, ::NActor::TDefaultTag, const TAcceptorStatus&);
- void ProcessItem(TConnectionTag, ::NActor::TDefaultTag, const TOnAccept&);
- void ProcessItem(TConnectionTag, TRemoveTag, TRemoteConnectionPtr);
- void ProcessConnectionsAcceptorsShapshotQueueItem(TAtomicSharedPtr<TConnectionsAcceptorsSnapshot>);
- void StatusUpdateCachedDump();
- void StatusUpdateCachedDumpIfNecessary(TInstant now);
- void Act(TStatusTag);
- void Act(TConnectionTag);
- TBusProtocol* GetProto() const noexcept override;
- const TBusSessionConfig* GetConfig() const noexcept override;
- TBusMessageQueue* GetQueue() const noexcept override;
- TString GetNameInternal() override;
- virtual void OnMessageReceived(TRemoteConnection* c, TVectorSwaps<TBusMessagePtrAndHeader>& newMsg) = 0;
- void Listen(int port, TBusMessageQueue* q);
- void Listen(const TVector<TBindResult>& bindTo, TBusMessageQueue* q);
- TBusConnection* Accept(SOCKET listen);
- inline ::NActor::TActor<TBusSessionImpl, TStatusTag>* GetStatusActor() {
- return this;
- }
- inline ::NActor::TActor<TBusSessionImpl, TConnectionTag>* GetConnectionsActor() {
- return this;
- }
- typedef THashMap<TBusSocketAddr, TRemoteConnectionPtr> TAddrRemoteConnections;
- void SendSnapshotToStatusActor();
- void InsertConnectionLockAcquired(TRemoteConnection* connection);
- void InsertAcceptorLockAcquired(TAcceptor* acceptor);
- void GetConnections(TVector<TRemoteConnectionPtr>*);
- void GetAcceptors(TVector<TAcceptorPtr>*);
- void GetConnectionsLockAquired(TVector<TRemoteConnectionPtr>*);
- void GetAcceptorsLockAquired(TVector<TAcceptorPtr>*);
- TRemoteConnectionPtr GetConnection(const TBusSocketAddr& addr, bool create);
- TRemoteConnectionPtr GetConnectionById(ui64 id);
- TAcceptorPtr GetAcceptorById(ui64 id);
- void InvokeOnError(TNonDestroyingAutoPtr<TBusMessage>, EMessageStatus);
- void Cron();
- TBusSessionJobCount JobCount;
- // TODO: replace with actor
- TMutex ConnectionsLock;
- struct TImpl;
- THolder<TImpl> Impl;
- const bool IsSource_;
- TBusMessageQueue* const Queue;
- TBusProtocol* const Proto;
- // copied to be available after Proto dies
- const TString ProtoName;
- IBusErrorHandler* const ErrorHandler;
- TUseCountHolder HandlerUseCountHolder;
- TBusSessionConfig Config; // TODO: make const
- NEventLoop::TEventLoop WriteEventLoop;
- NEventLoop::TEventLoop ReadEventLoop;
- THolder<NThreading::TLegacyFuture<void, false>> ReadEventLoopThread;
- THolder<NThreading::TLegacyFuture<void, false>> WriteEventLoopThread;
- THashMap<ui64, TRemoteConnectionPtr> ConnectionsById;
- TAddrRemoteConnections Connections;
- TAcceptorsPtrs Acceptors;
- struct TStatusData {
- TAtomicSharedPtr<TConnectionsAcceptorsSnapshot> ConnectionsAcceptorsSnapshot;
- ::NActor::TQueueForActor<TAtomicSharedPtr<TConnectionsAcceptorsSnapshot>> ConnectionsAcceptorsSnapshotsQueue;
- TAtomicShutdownState ShutdownState;
- TBusSessionStatus Status;
- TSessionDumpStatus StatusDumpCached;
- TMutex StatusDumpCachedMutex;
- TInstant StatusDumpCachedLastUpdate;
- TStatusData();
- };
- TStatusData StatusData;
- struct TConnectionsData {
- TAtomicShutdownState ShutdownState;
- TConnectionsData();
- };
- TConnectionsData ConnectionsData;
- ::NActor::TQueueInActor<TBusSessionImpl, TRemoteConnectionWriterIncrementalStatus,
- TStatusTag, TDeadConnectionTag>*
- GetDeadConnectionWriterStatusQueue() {
- return this;
- }
- ::NActor::TQueueInActor<TBusSessionImpl, TRemoteConnectionReaderIncrementalStatus,
- TStatusTag, TDeadConnectionTag>*
- GetDeadConnectionReaderStatusQueue() {
- return this;
- }
- ::NActor::TQueueInActor<TBusSessionImpl, TAcceptorStatus,
- TStatusTag, TDeadConnectionTag>*
- GetDeadAcceptorStatusQueue() {
- return this;
- }
- template <typename TItem>
- ::NActor::IQueueInActor<TItem>* GetQueue() {
- return this;
- }
- ui64 LastAcceptorId;
- ui64 LastConnectionId;
- TAtomic Down;
- TSystemEvent ShutdownCompleteEvent;
- };
- inline TBusProtocol* TBusSessionImpl::GetProto() const noexcept {
- return Proto;
- }
- inline const TBusSessionConfig* TBusSessionImpl::GetConfig() const noexcept {
- return &Config;
- }
- inline TBusMessageQueue* TBusSessionImpl::GetQueue() const noexcept {
- return Queue;
- }
- }
- }
|