#pragma once #include "connection.h" #include "defs.h" #include "handler.h" #include "message.h" #include "netaddr.h" #include "network.h" #include "session_config.h" #include "misc/weak_ptr.h" #include #include #include namespace NBus { template class TBusSessionPtr; using TBusClientSessionPtr = TBusSessionPtr; using TBusServerSessionPtr = TBusSessionPtr; /////////////////////////////////////////////////////////////////// /// \brief Interface of session object. /// Each client and server /// should instantiate session object to be able to communicate via bus /// client: sess = queue->CreateSource(protocol, handler); /// server: sess = queue->CreateDestination(protocol, handler); class TBusSession: public TWeakRefCounted { public: size_t GetInFlight(const TNetAddr& addr) const; size_t GetConnectSyscallsNumForTest(const TNetAddr& addr) const; virtual void GetInFlightBulk(TArrayRef addrs, TArrayRef results) const = 0; virtual void GetConnectSyscallsNumBulkForTest(TArrayRef addrs, TArrayRef results) const = 0; virtual int GetInFlight() const noexcept = 0; /// monitoring status of current session and it's connections virtual TString GetStatus(ui16 flags = YBUS_STATUS_CONNS) = 0; virtual TConnectionStatusMonRecord GetStatusProtobuf() = 0; virtual NPrivate::TSessionDumpStatus GetStatusRecordInternal() = 0; virtual TString GetStatusSingleLine() = 0; /// return session config virtual const TBusSessionConfig* GetConfig() const noexcept = 0; /// return session protocol virtual const TBusProtocol* GetProto() const noexcept = 0; virtual TBusMessageQueue* GetQueue() const noexcept = 0; /// registers external session on host:port with locator service int RegisterService(const char* hostname, TBusKey start = YBUS_KEYMIN, TBusKey end = YBUS_KEYMAX, EIpVersion ipVersion = EIP_VERSION_4); protected: TBusSession(); public: virtual TString GetNameInternal() = 0; virtual void Shutdown() = 0; virtual ~TBusSession(); }; struct TBusClientSession: public virtual TBusSession { typedef ::NBus::NPrivate::TRemoteClientSession TImpl; static TBusClientSessionPtr Create( TBusProtocol* proto, IBusClientHandler* handler, const TBusClientSessionConfig& config, TBusMessageQueuePtr queue); virtual TBusClientConnectionPtr GetConnection(const TNetAddr&) = 0; /// if you want to open connection early virtual void OpenConnection(const TNetAddr&) = 0; /// Send message to the destination /// If addr is set then use it as destination. /// Takes ownership of addr (see ClearState method). virtual EMessageStatus SendMessage(TBusMessage* pMes, const TNetAddr* addr = nullptr, bool wait = false) = 0; virtual EMessageStatus SendMessageOneWay(TBusMessage* pMes, const TNetAddr* addr = nullptr, bool wait = false) = 0; /// Like SendMessage but cares about message template EMessageStatus SendMessageAutoPtr(const TAutoPtr& mes, const TNetAddr* addr = nullptr, bool wait = false) { EMessageStatus status = SendMessage(mes.Get(), addr, wait); if (status == MESSAGE_OK) Y_UNUSED(mes.Release()); return status; } /// Like SendMessageOneWay but cares about message template EMessageStatus SendMessageOneWayAutoPtr(const TAutoPtr& mes, const TNetAddr* addr = nullptr, bool wait = false) { EMessageStatus status = SendMessageOneWay(mes.Get(), addr, wait); if (status == MESSAGE_OK) Y_UNUSED(mes.Release()); return status; } EMessageStatus SendMessageMove(TBusMessageAutoPtr message, const TNetAddr* addr = nullptr, bool wait = false) { return SendMessageAutoPtr(message, addr, wait); } EMessageStatus SendMessageOneWayMove(TBusMessageAutoPtr message, const TNetAddr* addr = nullptr, bool wait = false) { return SendMessageOneWayAutoPtr(message, addr, wait); } // TODO: implement similar one-way methods }; struct TBusServerSession: public virtual TBusSession { typedef ::NBus::NPrivate::TRemoteServerSession TImpl; static TBusServerSessionPtr Create( TBusProtocol* proto, IBusServerHandler* handler, const TBusServerSessionConfig& config, TBusMessageQueuePtr queue); static TBusServerSessionPtr Create( TBusProtocol* proto, IBusServerHandler* handler, const TBusServerSessionConfig& config, TBusMessageQueuePtr queue, const TVector& bindTo); // TODO: make parameter non-const virtual EMessageStatus SendReply(const TBusIdentity& ident, TBusMessage* pRep) = 0; // TODO: make parameter non-const virtual EMessageStatus ForgetRequest(const TBusIdentity& ident) = 0; template EMessageStatus SendReplyAutoPtr(TBusIdentity& ident, TAutoPtr& resp) { EMessageStatus status = SendReply(const_cast(ident), resp.Get()); if (status == MESSAGE_OK) { Y_UNUSED(resp.Release()); } return status; } EMessageStatus SendReplyMove(TBusIdentity& ident, TBusMessageAutoPtr resp) { return SendReplyAutoPtr(ident, resp); } /// Pause input from the network. /// It is valid to call this method in parallel. /// TODO: pull this method up to TBusSession. virtual void PauseInput(bool pause) = 0; virtual unsigned GetActualListenPort() = 0; }; namespace NPrivate { template class TBusOwnerSessionPtr: public TAtomicRefCount> { private: TIntrusivePtr Ptr; public: TBusOwnerSessionPtr(TBusSessionSubclass* session) : Ptr(session) { Y_ASSERT(!!Ptr); } ~TBusOwnerSessionPtr() { Ptr->Shutdown(); } TBusSessionSubclass* Get() const { return reinterpret_cast(Ptr.Get()); } }; } template class TBusSessionPtr { private: TIntrusivePtr> SmartPtr; TBusSessionSubclass* Ptr; public: TBusSessionPtr() : Ptr() { } TBusSessionPtr(TBusSessionSubclass* session) : SmartPtr(!!session ? new NPrivate::TBusOwnerSessionPtr(session) : nullptr) , Ptr(session) { } TBusSessionSubclass* Get() const { return Ptr; } operator TBusSessionSubclass*() { return Get(); } TBusSessionSubclass& operator*() const { return *Get(); } TBusSessionSubclass* operator->() const { return Get(); } bool operator!() const { return !Ptr; } void Swap(TBusSessionPtr& t) noexcept { DoSwap(SmartPtr, t.SmartPtr); DoSwap(Ptr, t.Ptr); } void Drop() { TBusSessionPtr().Swap(*this); } }; }