module.h 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410
  1. #pragma once
  2. ///////////////////////////////////////////////////////////////////////////
  3. /// \file
  4. /// \brief Application interface for modules
  5. /// NBus::TBusModule provides foundation for implementation of asynchnous
  6. /// modules that communicate with multiple external or local sessions
  7. /// NBus::TBusSession.
  8. /// To implement the module some virtual functions needs to be overridden:
  9. /// NBus::TBusModule::CreateExtSession() creates and registers an
  10. /// external session that receives incoming messages as input for module
  11. /// processing.
  12. /// When new incoming message arrives the new NBus::TBusJob is created.
  13. /// NBus::TBusJob is somewhat similar to a thread, it maintains all the state
  14. /// during processing of one incoming message. Default implementation of
  15. /// NBus::TBusJob will maintain all send and received messages during
  16. /// lifetime of this job. Each message, status and reply can be found
  17. /// within NBus::TJobState using NBus::TBusJob::GetState(). If your module
  18. /// needs to maintain an additional information during lifetime of the job
  19. /// you can derive your own class from NBus::TBusJob and override job
  20. /// factory method NBus::IJobFactory::CreateJobInstance() to create your instances.
  21. /// Processing of a given message starts with a call to NBus::TBusModule::Start()
  22. /// handler that should be overridden in the module implementation. Within
  23. /// the callback handler module can perform any computation and access any
  24. /// datastore tables that it needs. The handler can also access any module
  25. /// variables. However, same handler can be called from multiple threads so,
  26. /// it is recommended that handler only access read-only module level variables.
  27. /// Handler should use NBus::TBusJob::Send() to send messages to other client
  28. /// sessions and it can use NBus::TBusJob::Reply() to send reply to the main
  29. /// job message. When handler is done, it returns the pointer to the next handler to call
  30. /// when all pending messages have cleared. If handler
  31. /// returns pointer to itself the module will reschedule execution of this handler
  32. /// for a later time. This should be done in case NBus::TBusJob::Send() returns
  33. /// error (not MESSAGE_OK)
  34. #include "startsession.h"
  35. #include <library/cpp/messagebus/ybus.h>
  36. #include <util/generic/noncopyable.h>
  37. #include <util/generic/object_counter.h>
  38. namespace NBus {
  39. class TBusJob;
  40. class TBusModule;
  41. namespace NPrivate {
  42. struct TCallJobHandlerWorkItem;
  43. struct TBusModuleImpl;
  44. struct TModuleServerHandler;
  45. struct TModuleClientHandler;
  46. struct TJobRunner;
  47. }
  48. class TJobHandler {
  49. protected:
  50. typedef TJobHandler (TBusModule::*TBusHandlerPtr)(TBusJob* job, TBusMessage* mess);
  51. TBusHandlerPtr MyPtr;
  52. public:
  53. template <class B>
  54. TJobHandler(TJobHandler (B::*fptr)(TBusJob* job, TBusMessage* mess)) {
  55. MyPtr = static_cast<TBusHandlerPtr>(fptr);
  56. }
  57. TJobHandler(TBusHandlerPtr fptr = nullptr) {
  58. MyPtr = fptr;
  59. }
  60. TJobHandler(const TJobHandler&) = default;
  61. TJobHandler& operator =(const TJobHandler&) = default;
  62. bool operator==(TJobHandler h) const {
  63. return MyPtr == h.MyPtr;
  64. }
  65. bool operator!=(TJobHandler h) const {
  66. return MyPtr != h.MyPtr;
  67. }
  68. bool operator!() const {
  69. return !MyPtr;
  70. }
  71. TJobHandler operator()(TBusModule* b, TBusJob* job, TBusMessage* mess) {
  72. return (b->*MyPtr)(job, mess);
  73. }
  74. };
  75. typedef void (TBusModule::*TReplyHandler)(TBusJob* job, EMessageStatus status, TBusMessage* mess, TBusMessage* reply);
  76. ////////////////////////////////////////////////////
  77. /// \brief Pending message state
  78. struct TJobState {
  79. friend class TBusJob;
  80. friend class ::TCrawlerModule;
  81. TReplyHandler Handler;
  82. EMessageStatus Status;
  83. TBusMessage* Message;
  84. TBusMessage* Reply;
  85. TBusClientSession* Session;
  86. size_t NumRetries;
  87. size_t MaxRetries;
  88. // If != NULL then use it as destination.
  89. TNetAddr Addr;
  90. bool UseAddr;
  91. bool OneWay;
  92. private:
  93. TJobState(TReplyHandler handler,
  94. EMessageStatus status,
  95. TBusMessage* mess, TBusClientSession* session, TBusMessage* reply, size_t maxRetries = 0,
  96. const TNetAddr* addr = nullptr, bool oneWay = false)
  97. : Handler(handler)
  98. , Status(status)
  99. , Message(mess)
  100. , Reply(reply)
  101. , Session(session)
  102. , NumRetries(0)
  103. , MaxRetries(maxRetries)
  104. , OneWay(oneWay)
  105. {
  106. if (!!addr) {
  107. Addr = *addr;
  108. }
  109. UseAddr = !!addr;
  110. }
  111. public:
  112. TString GetStatus(unsigned flags);
  113. };
  114. using TJobStateVec = TVector<TJobState>;
  115. /////////////////////////////////////////////////////////
  116. /// \brief Execution item = thread
  117. /// Maintains internal state of document in computation
  118. class TBusJob {
  119. TObjectCounter<TBusJob> ObjectCounter;
  120. private:
  121. void CheckThreadCurrentJob();
  122. public:
  123. /// given a module and starter message
  124. TBusJob(TBusModule* module, TBusMessage* message);
  125. /// destructor will free all the message that were send and received
  126. virtual ~TBusJob();
  127. TBusMessage* GetMessage() const {
  128. return Message;
  129. }
  130. TNetAddr GetPeerAddrNetAddr() const;
  131. /// send message to any other session or application
  132. /// If addr is set then use it as destination.
  133. void Send(TBusMessageAutoPtr mess, TBusClientSession* session, TReplyHandler rhandler, size_t maxRetries, const TNetAddr& addr);
  134. void Send(TBusMessageAutoPtr mess, TBusClientSession* session, TReplyHandler rhandler = nullptr, size_t maxRetries = 0);
  135. void SendOneWayTo(TBusMessageAutoPtr req, TBusClientSession* session, const TNetAddr& addr);
  136. void SendOneWayWithLocator(TBusMessageAutoPtr req, TBusClientSession* session);
  137. /// send reply to the starter message
  138. virtual void SendReply(TBusMessageAutoPtr reply);
  139. /// set the flag to terminate job at the earliest convenience
  140. void Cancel(EMessageStatus status);
  141. /// helper to put item on finished list of states
  142. /// It should not be a part of public API,
  143. /// so prohibit it for all except current users.
  144. private:
  145. friend class ::TCrawlerModule;
  146. void PutState(const TJobState& state) {
  147. Finished.push_back(state);
  148. }
  149. public:
  150. /// retrieve all pending messages
  151. void GetPending(TJobStateVec* stateVec) {
  152. Y_ASSERT(stateVec);
  153. *stateVec = Pending;
  154. }
  155. /// helper function to find state of previously sent messages
  156. template <class MessageType>
  157. TJobState* GetState(int* startFrom = nullptr) {
  158. for (int i = startFrom ? *startFrom : 0; i < int(Finished.size()); i++) {
  159. TJobState* call = &Finished[i];
  160. if (call->Reply != nullptr && dynamic_cast<MessageType*>(call->Reply)) {
  161. if (startFrom) {
  162. *startFrom = i;
  163. }
  164. return call;
  165. }
  166. if (call->Message != nullptr && dynamic_cast<MessageType*>(call->Message)) {
  167. if (startFrom) {
  168. *startFrom = i;
  169. }
  170. return call;
  171. }
  172. }
  173. return nullptr;
  174. }
  175. /// helper function to find response for previously sent messages
  176. template <class MessageType>
  177. MessageType* Get(int* startFrom = nullptr) {
  178. for (int i = startFrom ? *startFrom : 0; i < int(Finished.size()); i++) {
  179. TJobState& call = Finished[i];
  180. if (call.Reply != nullptr && dynamic_cast<MessageType*>(call.Reply)) {
  181. if (startFrom) {
  182. *startFrom = i;
  183. }
  184. return static_cast<MessageType*>(call.Reply);
  185. }
  186. if (call.Message != nullptr && dynamic_cast<MessageType*>(call.Message)) {
  187. if (startFrom) {
  188. *startFrom = i;
  189. }
  190. return static_cast<MessageType*>(call.Message);
  191. }
  192. }
  193. return nullptr;
  194. }
  195. /// helper function to find status for previously sent message
  196. template <class MessageType>
  197. EMessageStatus GetStatus(int* startFrom = nullptr) {
  198. for (int i = startFrom ? *startFrom : 0; i < int(Finished.size()); i++) {
  199. TJobState& call = Finished[i];
  200. if (call.Message != nullptr && dynamic_cast<MessageType*>(call.Message)) {
  201. if (startFrom) {
  202. *startFrom = i;
  203. }
  204. return call.Status;
  205. }
  206. }
  207. return MESSAGE_UNKNOWN;
  208. }
  209. /// helper function to clear state of previosly sent messages
  210. template <class MessageType>
  211. void Clear() {
  212. for (size_t i = 0; i < Finished.size();) {
  213. // `Finished.size() - i` decreases with each iteration
  214. // we either increment i, or remove element from Finished.
  215. TJobState& call = Finished[i];
  216. if (call.Message != nullptr && dynamic_cast<MessageType*>(call.Message)) {
  217. ClearState(call);
  218. } else {
  219. ++i;
  220. }
  221. }
  222. }
  223. /// helper function to clear state in order to try again
  224. void ClearState(TJobState& state);
  225. /// clears all message states
  226. void ClearAllMessageStates();
  227. /// returns true if job is done
  228. bool IsDone();
  229. /// return human reabable status of this job
  230. virtual TString GetStatus(unsigned flags);
  231. /// set sleep time for job
  232. void Sleep(int milliSeconds);
  233. void CallJobHandlerOnly();
  234. private:
  235. bool CallJobHandler();
  236. void DoCallReplyHandler(TJobState&);
  237. /// send out all Pending jobs, failed sends will be migrated to Finished
  238. bool SendPending();
  239. bool AnyPendingToSend();
  240. public:
  241. /// helper to call from OnReply() and OnError()
  242. int CallReplyHandler(EMessageStatus status, TBusMessage* mess, TBusMessage* reply);
  243. public:
  244. TJobHandler Handler; ///< job handler to be executed within next CallJobHandler()
  245. EMessageStatus Status; ///< set != MESSAGE_OK if job should terminate asap
  246. private:
  247. NPrivate::TJobRunner* Runner;
  248. TBusMessage* Message;
  249. THolder<TBusMessage> MessageHolder;
  250. TOnMessageContext OnMessageContext; // starter
  251. public:
  252. bool ReplySent;
  253. private:
  254. friend class TBusModule;
  255. friend struct NPrivate::TBusModuleImpl;
  256. friend struct NPrivate::TCallJobHandlerWorkItem;
  257. friend struct NPrivate::TModuleServerHandler;
  258. friend struct NPrivate::TModuleClientHandler;
  259. friend struct NPrivate::TJobRunner;
  260. TJobStateVec Pending; ///< messages currently outstanding via Send()
  261. TJobStateVec Finished; ///< messages that were replied to
  262. TBusModule* Module;
  263. NPrivate::TBusModuleImpl* ModuleImpl; ///< module which created the job
  264. TBusInstant SleepUntil; ///< time to wakeup, 0 if no sleep
  265. };
  266. ////////////////////////////////////////////////////////////////////
  267. /// \brief Classes to implement basic module functionality
  268. class IJobFactory {
  269. protected:
  270. virtual ~IJobFactory() {
  271. }
  272. public:
  273. /// job factory method, override to create custom jobs
  274. virtual TBusJob* CreateJobInstance(TBusMessage* message) = 0;
  275. };
  276. struct TBusModuleConfig {
  277. unsigned StarterMaxInFlight;
  278. struct TSecret {
  279. TDuration SchedulePeriod;
  280. TSecret();
  281. };
  282. TSecret Secret;
  283. TBusModuleConfig();
  284. };
  285. namespace NPrivate {
  286. struct TBusModuleInternal: public TAtomicRefCount<TBusModuleInternal> {
  287. virtual TVector<TBusClientSessionPtr> GetClientSessionsInternal() = 0;
  288. virtual TVector<TBusServerSessionPtr> GetServerSessionsInternal() = 0;
  289. virtual TBusMessageQueue* GetQueue() = 0;
  290. virtual TString GetNameInternal() = 0;
  291. virtual TString GetStatusSingleLine() = 0;
  292. virtual ~TBusModuleInternal() {
  293. }
  294. };
  295. }
  296. class TBusModule: public IJobFactory, TNonCopyable {
  297. friend class TBusJob;
  298. TObjectCounter<TBusModule> ObjectCounter;
  299. TIntrusivePtr<NPrivate::TBusModuleImpl> Impl;
  300. public:
  301. /// Each module should have a name which is used as protocol service
  302. TBusModule(const char* name);
  303. ~TBusModule() override;
  304. const char* GetName() const;
  305. void SetConfig(const TBusModuleConfig& config);
  306. /// get status of all jobs in flight
  307. TString GetStatus(unsigned flags = 0);
  308. /// called when application is about to start
  309. virtual bool StartInput();
  310. /// called when application is about to exit
  311. virtual bool Shutdown();
  312. // this default implementation just creates TBusJob object
  313. TBusJob* CreateJobInstance(TBusMessage* message) override;
  314. EMessageStatus StartJob(TAutoPtr<TBusMessage> message);
  315. /// creates private sessions, calls CreateExtSession(), should be called before StartInput()
  316. bool CreatePrivateSessions(TBusMessageQueue* queue);
  317. virtual void OnClientConnectionEvent(const TClientConnectionEvent& event);
  318. public:
  319. /// entry point into module, first function to call
  320. virtual TJobHandler Start(TBusJob* job, TBusMessage* mess) = 0;
  321. protected:
  322. /// override this function to create destination session
  323. virtual TBusServerSessionPtr CreateExtSession(TBusMessageQueue& queue) = 0;
  324. public:
  325. int GetModuleSessionInFlight() const;
  326. TIntrusivePtr<NPrivate::TBusModuleInternal> GetInternal();
  327. protected:
  328. TBusServerSessionPtr CreateDefaultDestination(TBusMessageQueue& queue, TBusProtocol* proto, const TBusServerSessionConfig& config, const TString& name = TString());
  329. TBusClientSessionPtr CreateDefaultSource(TBusMessageQueue& queue, TBusProtocol* proto, const TBusClientSessionConfig& config, const TString& name = TString());
  330. TBusStarter* CreateDefaultStarter(TBusMessageQueue& unused, const TBusSessionConfig& config);
  331. };
  332. }