#pragma once /// Asynchronous Messaging Library implements framework for sending and /// receiving messages between loosely connected processes. #include "coreconn.h" #include "defs.h" #include "handler.h" #include "handler_impl.h" #include "local_flags.h" #include "locator.h" #include "message.h" #include "message_status.h" #include "network.h" #include "queue_config.h" #include "remote_connection_status.h" #include "session.h" #include "session_config.h" #include "socket_addr.h" #include #include #include #include #include #include #include #include #include #include #include #include #include namespace NBus { //////////////////////////////////////////////////////// /// \brief Common structure to store address information int CompareByHost(const IRemoteAddr& l, const IRemoteAddr& r) noexcept; bool operator<(const TNetAddr& a1, const TNetAddr& a2); // compare by addresses ///////////////////////////////////////////////////////////////////////// /// \brief Handles routing and data encoding to/from wire /// Protocol is stateless threadsafe singleton object that /// encapsulates relationship between a message (TBusMessage) object /// and destination server. Protocol object is reponsible for serializing in-memory /// message and reply into the wire, retuning name of the service and resource /// distribution key for given protocol. /// Protocol object should transparently handle messages and replies. /// This is interface only class, actuall instances of the protocols /// should be created using templates inhereted from this base class. class TBusProtocol { private: TString ServiceName; int ServicePort; public: TBusProtocol(TBusService name = "UNKNOWN", int port = 0) : ServiceName(name) , ServicePort(port) { } /// returns service type for this protocol and message TBusService GetService() const { return ServiceName.data(); } /// returns port number for destination session to open socket int GetPort() const { return ServicePort; } virtual ~TBusProtocol() { } /// \brief serialized protocol specific data into TBusData /// \note buffer passed to the function (data) is not empty, use append functions virtual void Serialize(const TBusMessage* mess, TBuffer& data) = 0; /// deserialized TBusData into new instance of the message virtual TAutoPtr Deserialize(ui16 messageType, TArrayRef payload) = 0; /// returns key for messages of this protocol virtual TBusKey GetKey(const TBusMessage*) { return YBUS_KEYMIN; } /// default implementation of routing policy to allow overrides virtual EMessageStatus GetDestination(const TBusClientSession* session, TBusMessage* mess, TBusLocator* locator, TNetAddr* addr); /// codec for transport level compression virtual NCodecs::TCodecPtr GetTransportCodec(void) const { return NCodecs::ICodec::GetInstance("snappy"); } }; class TBusSyncSourceSession: public TAtomicRefCount { friend class TBusMessageQueue; public: TBusSyncSourceSession(TIntrusivePtr< ::NBus::NPrivate::TBusSyncSourceSessionImpl> session); ~TBusSyncSourceSession(); void Shutdown(); TBusMessage* SendSyncMessage(TBusMessage* pMessage, EMessageStatus& status, const TNetAddr* addr = nullptr); int RegisterService(const char* hostname, TBusKey start = YBUS_KEYMIN, TBusKey end = YBUS_KEYMAX, EIpVersion ipVersion = EIP_VERSION_4); int GetInFlight(); const TBusProtocol* GetProto() const; const TBusClientSession* GetBusClientSessionWorkaroundDoNotUse() const; // It's for TLoadBalancedProtocol::GetDestination() function that really needs TBusClientSession* unlike all other protocols. Look at review 32425 (http://rb.yandex-team.ru/arc/r/32425/) for more information. private: TIntrusivePtr< ::NBus::NPrivate::TBusSyncSourceSessionImpl> Session; }; using TBusSyncClientSessionPtr = TIntrusivePtr; /////////////////////////////////////////////////////////////////// /// \brief Main message queue object, need one per application class TBusMessageQueue: public TAtomicRefCount { /// allow mesage queue to be created only via factory friend TBusMessageQueuePtr CreateMessageQueue(const TBusQueueConfig& config, NActor::TExecutorPtr executor, TBusLocator* locator, const char* name); friend class ::NBus::NPrivate::TRemoteConnection; friend struct ::NBus::NPrivate::TBusSessionImpl; friend class ::NBus::NPrivate::TAcceptor; friend struct ::NBus::TBusServerSession; private: const TBusQueueConfig Config; TMutex Lock; TList> Sessions; TSimpleIntrusivePtr Locator; NPrivate::TScheduler Scheduler; ::NActor::TExecutorPtr WorkQueue; TAtomic Running; TSystemEvent ShutdownComplete; private: /// constructor is protected, used NBus::CreateMessageQueue() to create a instance TBusMessageQueue(const TBusQueueConfig& config, NActor::TExecutorPtr executor, TBusLocator* locator, const char* name); public: TString GetNameInternal() const; ~TBusMessageQueue(); void Stop(); bool IsRunning(); public: void EnqueueWork(TArrayRef< ::NActor::IWorkItem* const> w) { WorkQueue->EnqueueWork(w); } ::NActor::TExecutor* GetExecutor() { return WorkQueue.Get(); } TString GetStatus(ui16 flags = YBUS_STATUS_CONNS) const; // without sessions NPrivate::TBusMessageQueueStatus GetStatusRecordInternal() const; TString GetStatusSelf() const; TString GetStatusSingleLine() const; TBusLocator* GetLocator() const { return Locator.Get(); } TBusClientSessionPtr CreateSource(TBusProtocol* proto, IBusClientHandler* handler, const TBusClientSessionConfig& config, const TString& name = ""); TBusSyncClientSessionPtr CreateSyncSource(TBusProtocol* proto, const TBusClientSessionConfig& config, bool needReply = true, const TString& name = ""); TBusServerSessionPtr CreateDestination(TBusProtocol* proto, IBusServerHandler* hander, const TBusServerSessionConfig& config, const TString& name = ""); TBusServerSessionPtr CreateDestination(TBusProtocol* proto, IBusServerHandler* hander, const TBusServerSessionConfig& config, const TVector& bindTo, const TString& name = ""); private: void Destroy(TBusSession* session); void Destroy(TBusSyncClientSessionPtr session); public: void Schedule(NPrivate::IScheduleItemAutoPtr i); private: void DestroyAllSessions(); void Add(TIntrusivePtr< ::NBus::NPrivate::TBusSessionImpl> session); void Remove(TBusSession* session); }; ///////////////////////////////////////////////////////////////// /// Factory methods to construct message queue TBusMessageQueuePtr CreateMessageQueue(const char* name = ""); TBusMessageQueuePtr CreateMessageQueue(NActor::TExecutorPtr executor, const char* name = ""); TBusMessageQueuePtr CreateMessageQueue(const TBusQueueConfig& config, const char* name = ""); TBusMessageQueuePtr CreateMessageQueue(const TBusQueueConfig& config, TBusLocator* locator, const char* name = ""); TBusMessageQueuePtr CreateMessageQueue(const TBusQueueConfig& config, NActor::TExecutorPtr executor, TBusLocator* locator, const char* name = ""); }