session.h 7.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225
  1. #pragma once
  2. #include "connection.h"
  3. #include "defs.h"
  4. #include "handler.h"
  5. #include "message.h"
  6. #include "netaddr.h"
  7. #include "network.h"
  8. #include "session_config.h"
  9. #include "misc/weak_ptr.h"
  10. #include <library/cpp/messagebus/monitoring/mon_proto.pb.h>
  11. #include <util/generic/array_ref.h>
  12. #include <util/generic/ptr.h>
  13. namespace NBus {
  14. template <typename TBusSessionSubclass>
  15. class TBusSessionPtr;
  16. using TBusClientSessionPtr = TBusSessionPtr<TBusClientSession>;
  17. using TBusServerSessionPtr = TBusSessionPtr<TBusServerSession>;
  18. ///////////////////////////////////////////////////////////////////
  19. /// \brief Interface of session object.
  20. /// Each client and server
  21. /// should instantiate session object to be able to communicate via bus
  22. /// client: sess = queue->CreateSource(protocol, handler);
  23. /// server: sess = queue->CreateDestination(protocol, handler);
  24. class TBusSession: public TWeakRefCounted<TBusSession> {
  25. public:
  26. size_t GetInFlight(const TNetAddr& addr) const;
  27. size_t GetConnectSyscallsNumForTest(const TNetAddr& addr) const;
  28. virtual void GetInFlightBulk(TArrayRef<const TNetAddr> addrs, TArrayRef<size_t> results) const = 0;
  29. virtual void GetConnectSyscallsNumBulkForTest(TArrayRef<const TNetAddr> addrs, TArrayRef<size_t> results) const = 0;
  30. virtual int GetInFlight() const noexcept = 0;
  31. /// monitoring status of current session and it's connections
  32. virtual TString GetStatus(ui16 flags = YBUS_STATUS_CONNS) = 0;
  33. virtual TConnectionStatusMonRecord GetStatusProtobuf() = 0;
  34. virtual NPrivate::TSessionDumpStatus GetStatusRecordInternal() = 0;
  35. virtual TString GetStatusSingleLine() = 0;
  36. /// return session config
  37. virtual const TBusSessionConfig* GetConfig() const noexcept = 0;
  38. /// return session protocol
  39. virtual const TBusProtocol* GetProto() const noexcept = 0;
  40. virtual TBusMessageQueue* GetQueue() const noexcept = 0;
  41. /// registers external session on host:port with locator service
  42. int RegisterService(const char* hostname, TBusKey start = YBUS_KEYMIN, TBusKey end = YBUS_KEYMAX, EIpVersion ipVersion = EIP_VERSION_4);
  43. protected:
  44. TBusSession();
  45. public:
  46. virtual TString GetNameInternal() = 0;
  47. virtual void Shutdown() = 0;
  48. virtual ~TBusSession();
  49. };
  50. struct TBusClientSession: public virtual TBusSession {
  51. typedef ::NBus::NPrivate::TRemoteClientSession TImpl;
  52. static TBusClientSessionPtr Create(
  53. TBusProtocol* proto,
  54. IBusClientHandler* handler,
  55. const TBusClientSessionConfig& config,
  56. TBusMessageQueuePtr queue);
  57. virtual TBusClientConnectionPtr GetConnection(const TNetAddr&) = 0;
  58. /// if you want to open connection early
  59. virtual void OpenConnection(const TNetAddr&) = 0;
  60. /// Send message to the destination
  61. /// If addr is set then use it as destination.
  62. /// Takes ownership of addr (see ClearState method).
  63. virtual EMessageStatus SendMessage(TBusMessage* pMes, const TNetAddr* addr = nullptr, bool wait = false) = 0;
  64. virtual EMessageStatus SendMessageOneWay(TBusMessage* pMes, const TNetAddr* addr = nullptr, bool wait = false) = 0;
  65. /// Like SendMessage but cares about message
  66. template <typename T /* <: TBusMessage */>
  67. EMessageStatus SendMessageAutoPtr(const TAutoPtr<T>& mes, const TNetAddr* addr = nullptr, bool wait = false) {
  68. EMessageStatus status = SendMessage(mes.Get(), addr, wait);
  69. if (status == MESSAGE_OK)
  70. Y_UNUSED(mes.Release());
  71. return status;
  72. }
  73. /// Like SendMessageOneWay but cares about message
  74. template <typename T /* <: TBusMessage */>
  75. EMessageStatus SendMessageOneWayAutoPtr(const TAutoPtr<T>& mes, const TNetAddr* addr = nullptr, bool wait = false) {
  76. EMessageStatus status = SendMessageOneWay(mes.Get(), addr, wait);
  77. if (status == MESSAGE_OK)
  78. Y_UNUSED(mes.Release());
  79. return status;
  80. }
  81. EMessageStatus SendMessageMove(TBusMessageAutoPtr message, const TNetAddr* addr = nullptr, bool wait = false) {
  82. return SendMessageAutoPtr(message, addr, wait);
  83. }
  84. EMessageStatus SendMessageOneWayMove(TBusMessageAutoPtr message, const TNetAddr* addr = nullptr, bool wait = false) {
  85. return SendMessageOneWayAutoPtr(message, addr, wait);
  86. }
  87. // TODO: implement similar one-way methods
  88. };
  89. struct TBusServerSession: public virtual TBusSession {
  90. typedef ::NBus::NPrivate::TRemoteServerSession TImpl;
  91. static TBusServerSessionPtr Create(
  92. TBusProtocol* proto,
  93. IBusServerHandler* handler,
  94. const TBusServerSessionConfig& config,
  95. TBusMessageQueuePtr queue);
  96. static TBusServerSessionPtr Create(
  97. TBusProtocol* proto,
  98. IBusServerHandler* handler,
  99. const TBusServerSessionConfig& config,
  100. TBusMessageQueuePtr queue,
  101. const TVector<TBindResult>& bindTo);
  102. // TODO: make parameter non-const
  103. virtual EMessageStatus SendReply(const TBusIdentity& ident, TBusMessage* pRep) = 0;
  104. // TODO: make parameter non-const
  105. virtual EMessageStatus ForgetRequest(const TBusIdentity& ident) = 0;
  106. template <typename U /* <: TBusMessage */>
  107. EMessageStatus SendReplyAutoPtr(TBusIdentity& ident, TAutoPtr<U>& resp) {
  108. EMessageStatus status = SendReply(const_cast<const TBusIdentity&>(ident), resp.Get());
  109. if (status == MESSAGE_OK) {
  110. Y_UNUSED(resp.Release());
  111. }
  112. return status;
  113. }
  114. EMessageStatus SendReplyMove(TBusIdentity& ident, TBusMessageAutoPtr resp) {
  115. return SendReplyAutoPtr(ident, resp);
  116. }
  117. /// Pause input from the network.
  118. /// It is valid to call this method in parallel.
  119. /// TODO: pull this method up to TBusSession.
  120. virtual void PauseInput(bool pause) = 0;
  121. virtual unsigned GetActualListenPort() = 0;
  122. };
  123. namespace NPrivate {
  124. template <typename TBusSessionSubclass>
  125. class TBusOwnerSessionPtr: public TAtomicRefCount<TBusOwnerSessionPtr<TBusSessionSubclass>> {
  126. private:
  127. TIntrusivePtr<TBusSessionSubclass> Ptr;
  128. public:
  129. TBusOwnerSessionPtr(TBusSessionSubclass* session)
  130. : Ptr(session)
  131. {
  132. Y_ASSERT(!!Ptr);
  133. }
  134. ~TBusOwnerSessionPtr() {
  135. Ptr->Shutdown();
  136. }
  137. TBusSessionSubclass* Get() const {
  138. return reinterpret_cast<TBusSessionSubclass*>(Ptr.Get());
  139. }
  140. };
  141. }
  142. template <typename TBusSessionSubclass>
  143. class TBusSessionPtr {
  144. private:
  145. TIntrusivePtr<NPrivate::TBusOwnerSessionPtr<TBusSessionSubclass>> SmartPtr;
  146. TBusSessionSubclass* Ptr;
  147. public:
  148. TBusSessionPtr()
  149. : Ptr()
  150. {
  151. }
  152. TBusSessionPtr(TBusSessionSubclass* session)
  153. : SmartPtr(!!session ? new NPrivate::TBusOwnerSessionPtr<TBusSessionSubclass>(session) : nullptr)
  154. , Ptr(session)
  155. {
  156. }
  157. TBusSessionSubclass* Get() const {
  158. return Ptr;
  159. }
  160. operator TBusSessionSubclass*() {
  161. return Get();
  162. }
  163. TBusSessionSubclass& operator*() const {
  164. return *Get();
  165. }
  166. TBusSessionSubclass* operator->() const {
  167. return Get();
  168. }
  169. bool operator!() const {
  170. return !Ptr;
  171. }
  172. void Swap(TBusSessionPtr& t) noexcept {
  173. DoSwap(SmartPtr, t.SmartPtr);
  174. DoSwap(Ptr, t.Ptr);
  175. }
  176. void Drop() {
  177. TBusSessionPtr().Swap(*this);
  178. }
  179. };
  180. }