123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225 |
- #pragma once
- #include "connection.h"
- #include "defs.h"
- #include "handler.h"
- #include "message.h"
- #include "netaddr.h"
- #include "network.h"
- #include "session_config.h"
- #include "misc/weak_ptr.h"
- #include <library/cpp/messagebus/monitoring/mon_proto.pb.h>
- #include <util/generic/array_ref.h>
- #include <util/generic/ptr.h>
- namespace NBus {
- template <typename TBusSessionSubclass>
- class TBusSessionPtr;
- using TBusClientSessionPtr = TBusSessionPtr<TBusClientSession>;
- using TBusServerSessionPtr = TBusSessionPtr<TBusServerSession>;
- ///////////////////////////////////////////////////////////////////
- /// \brief Interface of session object.
- /// Each client and server
- /// should instantiate session object to be able to communicate via bus
- /// client: sess = queue->CreateSource(protocol, handler);
- /// server: sess = queue->CreateDestination(protocol, handler);
- class TBusSession: public TWeakRefCounted<TBusSession> {
- public:
- size_t GetInFlight(const TNetAddr& addr) const;
- size_t GetConnectSyscallsNumForTest(const TNetAddr& addr) const;
- virtual void GetInFlightBulk(TArrayRef<const TNetAddr> addrs, TArrayRef<size_t> results) const = 0;
- virtual void GetConnectSyscallsNumBulkForTest(TArrayRef<const TNetAddr> addrs, TArrayRef<size_t> results) const = 0;
- virtual int GetInFlight() const noexcept = 0;
- /// monitoring status of current session and it's connections
- virtual TString GetStatus(ui16 flags = YBUS_STATUS_CONNS) = 0;
- virtual TConnectionStatusMonRecord GetStatusProtobuf() = 0;
- virtual NPrivate::TSessionDumpStatus GetStatusRecordInternal() = 0;
- virtual TString GetStatusSingleLine() = 0;
- /// return session config
- virtual const TBusSessionConfig* GetConfig() const noexcept = 0;
- /// return session protocol
- virtual const TBusProtocol* GetProto() const noexcept = 0;
- virtual TBusMessageQueue* GetQueue() const noexcept = 0;
- /// registers external session on host:port with locator service
- int RegisterService(const char* hostname, TBusKey start = YBUS_KEYMIN, TBusKey end = YBUS_KEYMAX, EIpVersion ipVersion = EIP_VERSION_4);
- protected:
- TBusSession();
- public:
- virtual TString GetNameInternal() = 0;
- virtual void Shutdown() = 0;
- virtual ~TBusSession();
- };
- struct TBusClientSession: public virtual TBusSession {
- typedef ::NBus::NPrivate::TRemoteClientSession TImpl;
- static TBusClientSessionPtr Create(
- TBusProtocol* proto,
- IBusClientHandler* handler,
- const TBusClientSessionConfig& config,
- TBusMessageQueuePtr queue);
- virtual TBusClientConnectionPtr GetConnection(const TNetAddr&) = 0;
- /// if you want to open connection early
- virtual void OpenConnection(const TNetAddr&) = 0;
- /// Send message to the destination
- /// If addr is set then use it as destination.
- /// Takes ownership of addr (see ClearState method).
- virtual EMessageStatus SendMessage(TBusMessage* pMes, const TNetAddr* addr = nullptr, bool wait = false) = 0;
- virtual EMessageStatus SendMessageOneWay(TBusMessage* pMes, const TNetAddr* addr = nullptr, bool wait = false) = 0;
- /// Like SendMessage but cares about message
- template <typename T /* <: TBusMessage */>
- EMessageStatus SendMessageAutoPtr(const TAutoPtr<T>& mes, const TNetAddr* addr = nullptr, bool wait = false) {
- EMessageStatus status = SendMessage(mes.Get(), addr, wait);
- if (status == MESSAGE_OK)
- Y_UNUSED(mes.Release());
- return status;
- }
- /// Like SendMessageOneWay but cares about message
- template <typename T /* <: TBusMessage */>
- EMessageStatus SendMessageOneWayAutoPtr(const TAutoPtr<T>& mes, const TNetAddr* addr = nullptr, bool wait = false) {
- EMessageStatus status = SendMessageOneWay(mes.Get(), addr, wait);
- if (status == MESSAGE_OK)
- Y_UNUSED(mes.Release());
- return status;
- }
- EMessageStatus SendMessageMove(TBusMessageAutoPtr message, const TNetAddr* addr = nullptr, bool wait = false) {
- return SendMessageAutoPtr(message, addr, wait);
- }
- EMessageStatus SendMessageOneWayMove(TBusMessageAutoPtr message, const TNetAddr* addr = nullptr, bool wait = false) {
- return SendMessageOneWayAutoPtr(message, addr, wait);
- }
- // TODO: implement similar one-way methods
- };
- struct TBusServerSession: public virtual TBusSession {
- typedef ::NBus::NPrivate::TRemoteServerSession TImpl;
- static TBusServerSessionPtr Create(
- TBusProtocol* proto,
- IBusServerHandler* handler,
- const TBusServerSessionConfig& config,
- TBusMessageQueuePtr queue);
- static TBusServerSessionPtr Create(
- TBusProtocol* proto,
- IBusServerHandler* handler,
- const TBusServerSessionConfig& config,
- TBusMessageQueuePtr queue,
- const TVector<TBindResult>& bindTo);
- // TODO: make parameter non-const
- virtual EMessageStatus SendReply(const TBusIdentity& ident, TBusMessage* pRep) = 0;
- // TODO: make parameter non-const
- virtual EMessageStatus ForgetRequest(const TBusIdentity& ident) = 0;
- template <typename U /* <: TBusMessage */>
- EMessageStatus SendReplyAutoPtr(TBusIdentity& ident, TAutoPtr<U>& resp) {
- EMessageStatus status = SendReply(const_cast<const TBusIdentity&>(ident), resp.Get());
- if (status == MESSAGE_OK) {
- Y_UNUSED(resp.Release());
- }
- return status;
- }
- EMessageStatus SendReplyMove(TBusIdentity& ident, TBusMessageAutoPtr resp) {
- return SendReplyAutoPtr(ident, resp);
- }
- /// Pause input from the network.
- /// It is valid to call this method in parallel.
- /// TODO: pull this method up to TBusSession.
- virtual void PauseInput(bool pause) = 0;
- virtual unsigned GetActualListenPort() = 0;
- };
- namespace NPrivate {
- template <typename TBusSessionSubclass>
- class TBusOwnerSessionPtr: public TAtomicRefCount<TBusOwnerSessionPtr<TBusSessionSubclass>> {
- private:
- TIntrusivePtr<TBusSessionSubclass> Ptr;
- public:
- TBusOwnerSessionPtr(TBusSessionSubclass* session)
- : Ptr(session)
- {
- Y_ASSERT(!!Ptr);
- }
- ~TBusOwnerSessionPtr() {
- Ptr->Shutdown();
- }
- TBusSessionSubclass* Get() const {
- return reinterpret_cast<TBusSessionSubclass*>(Ptr.Get());
- }
- };
- }
- template <typename TBusSessionSubclass>
- class TBusSessionPtr {
- private:
- TIntrusivePtr<NPrivate::TBusOwnerSessionPtr<TBusSessionSubclass>> SmartPtr;
- TBusSessionSubclass* Ptr;
- public:
- TBusSessionPtr()
- : Ptr()
- {
- }
- TBusSessionPtr(TBusSessionSubclass* session)
- : SmartPtr(!!session ? new NPrivate::TBusOwnerSessionPtr<TBusSessionSubclass>(session) : nullptr)
- , Ptr(session)
- {
- }
- TBusSessionSubclass* Get() const {
- return Ptr;
- }
- operator TBusSessionSubclass*() {
- return Get();
- }
- TBusSessionSubclass& operator*() const {
- return *Get();
- }
- TBusSessionSubclass* operator->() const {
- return Get();
- }
- bool operator!() const {
- return !Ptr;
- }
- void Swap(TBusSessionPtr& t) noexcept {
- DoSwap(SmartPtr, t.SmartPtr);
- DoSwap(Ptr, t.Ptr);
- }
- void Drop() {
- TBusSessionPtr().Swap(*this);
- }
- };
- }
|