ybus.h 8.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205
  1. #pragma once
  2. /// Asynchronous Messaging Library implements framework for sending and
  3. /// receiving messages between loosely connected processes.
  4. #include "coreconn.h"
  5. #include "defs.h"
  6. #include "handler.h"
  7. #include "handler_impl.h"
  8. #include "local_flags.h"
  9. #include "locator.h"
  10. #include "message.h"
  11. #include "message_status.h"
  12. #include "network.h"
  13. #include "queue_config.h"
  14. #include "remote_connection_status.h"
  15. #include "session.h"
  16. #include "session_config.h"
  17. #include "socket_addr.h"
  18. #include <library/cpp/messagebus/actor/executor.h>
  19. #include <library/cpp/messagebus/scheduler/scheduler.h>
  20. #include <library/cpp/codecs/codecs.h>
  21. #include <util/generic/array_ref.h>
  22. #include <util/generic/buffer.h>
  23. #include <util/generic/noncopyable.h>
  24. #include <util/generic/ptr.h>
  25. #include <util/stream/input.h>
  26. #include <library/cpp/deprecated/atomic/atomic.h>
  27. #include <util/system/condvar.h>
  28. #include <util/system/type_name.h>
  29. #include <util/system/event.h>
  30. #include <util/system/mutex.h>
  31. namespace NBus {
  32. ////////////////////////////////////////////////////////
  33. /// \brief Common structure to store address information
  34. int CompareByHost(const IRemoteAddr& l, const IRemoteAddr& r) noexcept;
  35. bool operator<(const TNetAddr& a1, const TNetAddr& a2); // compare by addresses
  36. /////////////////////////////////////////////////////////////////////////
  37. /// \brief Handles routing and data encoding to/from wire
  38. /// Protocol is stateless threadsafe singleton object that
  39. /// encapsulates relationship between a message (TBusMessage) object
  40. /// and destination server. Protocol object is reponsible for serializing in-memory
  41. /// message and reply into the wire, retuning name of the service and resource
  42. /// distribution key for given protocol.
  43. /// Protocol object should transparently handle messages and replies.
  44. /// This is interface only class, actuall instances of the protocols
  45. /// should be created using templates inhereted from this base class.
  46. class TBusProtocol {
  47. private:
  48. TString ServiceName;
  49. int ServicePort;
  50. public:
  51. TBusProtocol(TBusService name = "UNKNOWN", int port = 0)
  52. : ServiceName(name)
  53. , ServicePort(port)
  54. {
  55. }
  56. /// returns service type for this protocol and message
  57. TBusService GetService() const {
  58. return ServiceName.data();
  59. }
  60. /// returns port number for destination session to open socket
  61. int GetPort() const {
  62. return ServicePort;
  63. }
  64. virtual ~TBusProtocol() {
  65. }
  66. /// \brief serialized protocol specific data into TBusData
  67. /// \note buffer passed to the function (data) is not empty, use append functions
  68. virtual void Serialize(const TBusMessage* mess, TBuffer& data) = 0;
  69. /// deserialized TBusData into new instance of the message
  70. virtual TAutoPtr<TBusMessage> Deserialize(ui16 messageType, TArrayRef<const char> payload) = 0;
  71. /// returns key for messages of this protocol
  72. virtual TBusKey GetKey(const TBusMessage*) {
  73. return YBUS_KEYMIN;
  74. }
  75. /// default implementation of routing policy to allow overrides
  76. virtual EMessageStatus GetDestination(const TBusClientSession* session, TBusMessage* mess, TBusLocator* locator, TNetAddr* addr);
  77. /// codec for transport level compression
  78. virtual NCodecs::TCodecPtr GetTransportCodec(void) const {
  79. return NCodecs::ICodec::GetInstance("snappy");
  80. }
  81. };
  82. class TBusSyncSourceSession: public TAtomicRefCount<TBusSyncSourceSession> {
  83. friend class TBusMessageQueue;
  84. public:
  85. TBusSyncSourceSession(TIntrusivePtr< ::NBus::NPrivate::TBusSyncSourceSessionImpl> session);
  86. ~TBusSyncSourceSession();
  87. void Shutdown();
  88. TBusMessage* SendSyncMessage(TBusMessage* pMessage, EMessageStatus& status, const TNetAddr* addr = nullptr);
  89. int RegisterService(const char* hostname, TBusKey start = YBUS_KEYMIN, TBusKey end = YBUS_KEYMAX, EIpVersion ipVersion = EIP_VERSION_4);
  90. int GetInFlight();
  91. const TBusProtocol* GetProto() const;
  92. const TBusClientSession* GetBusClientSessionWorkaroundDoNotUse() const; // It's for TLoadBalancedProtocol::GetDestination() function that really needs TBusClientSession* unlike all other protocols. Look at review 32425 (http://rb.yandex-team.ru/arc/r/32425/) for more information.
  93. private:
  94. TIntrusivePtr< ::NBus::NPrivate::TBusSyncSourceSessionImpl> Session;
  95. };
  96. using TBusSyncClientSessionPtr = TIntrusivePtr<TBusSyncSourceSession>;
  97. ///////////////////////////////////////////////////////////////////
  98. /// \brief Main message queue object, need one per application
  99. class TBusMessageQueue: public TAtomicRefCount<TBusMessageQueue> {
  100. /// allow mesage queue to be created only via factory
  101. friend TBusMessageQueuePtr CreateMessageQueue(const TBusQueueConfig& config, NActor::TExecutorPtr executor, TBusLocator* locator, const char* name);
  102. friend class ::NBus::NPrivate::TRemoteConnection;
  103. friend struct ::NBus::NPrivate::TBusSessionImpl;
  104. friend class ::NBus::NPrivate::TAcceptor;
  105. friend struct ::NBus::TBusServerSession;
  106. private:
  107. const TBusQueueConfig Config;
  108. TMutex Lock;
  109. TList<TIntrusivePtr< ::NBus::NPrivate::TBusSessionImpl>> Sessions;
  110. TSimpleIntrusivePtr<TBusLocator> Locator;
  111. NPrivate::TScheduler Scheduler;
  112. ::NActor::TExecutorPtr WorkQueue;
  113. TAtomic Running;
  114. TSystemEvent ShutdownComplete;
  115. private:
  116. /// constructor is protected, used NBus::CreateMessageQueue() to create a instance
  117. TBusMessageQueue(const TBusQueueConfig& config, NActor::TExecutorPtr executor, TBusLocator* locator, const char* name);
  118. public:
  119. TString GetNameInternal() const;
  120. ~TBusMessageQueue();
  121. void Stop();
  122. bool IsRunning();
  123. public:
  124. void EnqueueWork(TArrayRef< ::NActor::IWorkItem* const> w) {
  125. WorkQueue->EnqueueWork(w);
  126. }
  127. ::NActor::TExecutor* GetExecutor() {
  128. return WorkQueue.Get();
  129. }
  130. TString GetStatus(ui16 flags = YBUS_STATUS_CONNS) const;
  131. // without sessions
  132. NPrivate::TBusMessageQueueStatus GetStatusRecordInternal() const;
  133. TString GetStatusSelf() const;
  134. TString GetStatusSingleLine() const;
  135. TBusLocator* GetLocator() const {
  136. return Locator.Get();
  137. }
  138. TBusClientSessionPtr CreateSource(TBusProtocol* proto, IBusClientHandler* handler, const TBusClientSessionConfig& config, const TString& name = "");
  139. TBusSyncClientSessionPtr CreateSyncSource(TBusProtocol* proto, const TBusClientSessionConfig& config, bool needReply = true, const TString& name = "");
  140. TBusServerSessionPtr CreateDestination(TBusProtocol* proto, IBusServerHandler* hander, const TBusServerSessionConfig& config, const TString& name = "");
  141. TBusServerSessionPtr CreateDestination(TBusProtocol* proto, IBusServerHandler* hander, const TBusServerSessionConfig& config, const TVector<TBindResult>& bindTo, const TString& name = "");
  142. private:
  143. void Destroy(TBusSession* session);
  144. void Destroy(TBusSyncClientSessionPtr session);
  145. public:
  146. void Schedule(NPrivate::IScheduleItemAutoPtr i);
  147. private:
  148. void DestroyAllSessions();
  149. void Add(TIntrusivePtr< ::NBus::NPrivate::TBusSessionImpl> session);
  150. void Remove(TBusSession* session);
  151. };
  152. /////////////////////////////////////////////////////////////////
  153. /// Factory methods to construct message queue
  154. TBusMessageQueuePtr CreateMessageQueue(const char* name = "");
  155. TBusMessageQueuePtr CreateMessageQueue(NActor::TExecutorPtr executor, const char* name = "");
  156. TBusMessageQueuePtr CreateMessageQueue(const TBusQueueConfig& config, const char* name = "");
  157. TBusMessageQueuePtr CreateMessageQueue(const TBusQueueConfig& config, TBusLocator* locator, const char* name = "");
  158. TBusMessageQueuePtr CreateMessageQueue(const TBusQueueConfig& config, NActor::TExecutorPtr executor, TBusLocator* locator, const char* name = "");
  159. }