module.cpp 28 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881
  1. #include "module.h"
  2. #include <library/cpp/messagebus/scheduler_actor.h>
  3. #include <library/cpp/messagebus/thread_extra.h>
  4. #include <library/cpp/messagebus/actor/actor.h>
  5. #include <library/cpp/messagebus/actor/queue_in_actor.h>
  6. #include <library/cpp/messagebus/actor/what_thread_does.h>
  7. #include <library/cpp/messagebus/actor/what_thread_does_guard.h>
  8. #include <util/generic/singleton.h>
  9. #include <util/string/printf.h>
  10. #include <util/system/event.h>
  11. using namespace NActor;
  12. using namespace NBus;
  13. using namespace NBus::NPrivate;
  14. namespace {
  15. Y_POD_STATIC_THREAD(TBusJob*)
  16. ThreadCurrentJob;
  17. struct TThreadCurrentJobGuard {
  18. TBusJob* Prev;
  19. TThreadCurrentJobGuard(TBusJob* job)
  20. : Prev(ThreadCurrentJob)
  21. {
  22. Y_ASSERT(!ThreadCurrentJob || ThreadCurrentJob == job);
  23. ThreadCurrentJob = job;
  24. }
  25. ~TThreadCurrentJobGuard() {
  26. ThreadCurrentJob = Prev;
  27. }
  28. };
  29. void ClearState(NBus::TJobState* state) {
  30. /// skip sendbacks handlers
  31. if (state->Message != state->Reply) {
  32. if (state->Message) {
  33. delete state->Message;
  34. state->Message = nullptr;
  35. }
  36. if (state->Reply) {
  37. delete state->Reply;
  38. state->Reply = nullptr;
  39. }
  40. }
  41. }
  42. void ClearJobStateVector(NBus::TJobStateVec* vec) {
  43. Y_ASSERT(vec);
  44. for (auto& call : *vec) {
  45. ClearState(&call);
  46. }
  47. vec->clear();
  48. }
  49. }
  50. namespace NBus {
  51. namespace NPrivate {
  52. class TJobStorage {
  53. };
  54. struct TModuleClientHandler
  55. : public IBusClientHandler {
  56. TModuleClientHandler(TBusModuleImpl* module)
  57. : Module(module)
  58. {
  59. }
  60. void OnReply(TAutoPtr<TBusMessage> req, TAutoPtr<TBusMessage> reply) override;
  61. void OnMessageSentOneWay(TAutoPtr<TBusMessage> pMessage) override;
  62. void OnError(TAutoPtr<TBusMessage> msg, EMessageStatus status) override;
  63. void OnClientConnectionEvent(const TClientConnectionEvent& event) override;
  64. TBusModuleImpl* const Module;
  65. };
  66. struct TModuleServerHandler
  67. : public IBusServerHandler {
  68. TModuleServerHandler(TBusModuleImpl* module)
  69. : Module(module)
  70. {
  71. }
  72. void OnMessage(TOnMessageContext& msg) override;
  73. TBusModuleImpl* const Module;
  74. };
  75. struct TBusModuleImpl: public TBusModuleInternal {
  76. TBusModule* const Module;
  77. TBusMessageQueue* Queue;
  78. TScheduler Scheduler;
  79. const char* const Name;
  80. typedef TList<TJobRunner*> TBusJobList;
  81. /// jobs currently in-flight on this module
  82. TBusJobList Jobs;
  83. /// module level mutex
  84. TMutex Lock;
  85. TCondVar ShutdownCondVar;
  86. TAtomic JobCount;
  87. enum EState {
  88. CREATED,
  89. RUNNING,
  90. STOPPED,
  91. };
  92. TAtomic State;
  93. TBusModuleConfig ModuleConfig;
  94. TBusServerSessionPtr ExternalSession;
  95. /// protocol for local proxy session
  96. THolder<IBusClientHandler> ModuleClientHandler;
  97. THolder<IBusServerHandler> ModuleServerHandler;
  98. TVector<TSimpleSharedPtr<TBusStarter>> Starters;
  99. // Sessions must be destroyed before
  100. // ModuleClientHandler / ModuleServerHandler
  101. TVector<TBusClientSessionPtr> ClientSessions;
  102. TVector<TBusServerSessionPtr> ServerSessions;
  103. TBusModuleImpl(TBusModule* module, const char* name)
  104. : Module(module)
  105. , Queue()
  106. , Name(name)
  107. , JobCount(0)
  108. , State(CREATED)
  109. , ExternalSession(nullptr)
  110. , ModuleClientHandler(new TModuleClientHandler(this))
  111. , ModuleServerHandler(new TModuleServerHandler(this))
  112. {
  113. }
  114. ~TBusModuleImpl() override {
  115. // Shutdown cannot be called from destructor,
  116. // because module has virtual methods.
  117. Y_ABORT_UNLESS(State != RUNNING, "if running, must explicitly call Shutdown() before destructor");
  118. Scheduler.Stop();
  119. while (!Jobs.empty()) {
  120. DestroyJob(Jobs.front());
  121. }
  122. Y_ABORT_UNLESS(JobCount == 0, "state check");
  123. }
  124. void OnMessageReceived(TAutoPtr<TBusMessage> msg, TOnMessageContext&);
  125. void AddJob(TJobRunner* jobRunner);
  126. void DestroyJob(TJobRunner* job);
  127. /// terminate job on this message
  128. void CancelJob(TBusJob* job, EMessageStatus status);
  129. /// prints statuses of jobs
  130. TString GetStatus(unsigned flags);
  131. size_t Size() const {
  132. return AtomicGet(JobCount);
  133. }
  134. void Shutdown();
  135. TVector<TBusClientSessionPtr> GetClientSessionsInternal() override {
  136. return ClientSessions;
  137. }
  138. TVector<TBusServerSessionPtr> GetServerSessionsInternal() override {
  139. return ServerSessions;
  140. }
  141. TBusMessageQueue* GetQueue() override {
  142. return Queue;
  143. }
  144. TString GetNameInternal() override {
  145. return Name;
  146. }
  147. TString GetStatusSingleLine() override {
  148. TStringStream ss;
  149. ss << "jobs: " << Size();
  150. return ss.Str();
  151. }
  152. void OnClientConnectionEvent(const TClientConnectionEvent& event) {
  153. Module->OnClientConnectionEvent(event);
  154. }
  155. };
  156. struct TJobResponseMessage {
  157. TBusMessage* Request;
  158. TBusMessage* Response;
  159. EMessageStatus Status;
  160. TJobResponseMessage(TBusMessage* request, TBusMessage* response, EMessageStatus status)
  161. : Request(request)
  162. , Response(response)
  163. , Status(status)
  164. {
  165. }
  166. };
  167. struct TJobRunner: public TAtomicRefCount<TJobRunner>,
  168. public NActor::TActor<TJobRunner>,
  169. public NActor::TQueueInActor<TJobRunner, TJobResponseMessage>,
  170. public TScheduleActor<TJobRunner> {
  171. THolder<TBusJob> Job;
  172. TList<TJobRunner*>::iterator JobStorageIterator;
  173. TJobRunner(TAutoPtr<TBusJob> job)
  174. : NActor::TActor<TJobRunner>(job->ModuleImpl->Queue->GetExecutor())
  175. , TScheduleActor<TJobRunner>(&job->ModuleImpl->Scheduler)
  176. , Job(job.Release())
  177. , JobStorageIterator()
  178. {
  179. Job->Runner = this;
  180. }
  181. ~TJobRunner() override {
  182. Y_ASSERT(JobStorageIterator == TList<TJobRunner*>::iterator());
  183. }
  184. void ProcessItem(NActor::TDefaultTag, NActor::TDefaultTag, const TJobResponseMessage& message) {
  185. Job->CallReplyHandler(message.Status, message.Request, message.Response);
  186. }
  187. void Destroy() {
  188. if (!!Job->OnMessageContext) {
  189. if (!Job->ReplySent) {
  190. Job->OnMessageContext.ForgetRequest();
  191. }
  192. }
  193. Job->ModuleImpl->DestroyJob(this);
  194. }
  195. void Act(NActor::TDefaultTag) {
  196. if (JobStorageIterator == TList<TJobRunner*>::iterator()) {
  197. return;
  198. }
  199. if (Job->SleepUntil != 0) {
  200. if (AtomicGet(Job->ModuleImpl->State) == TBusModuleImpl::STOPPED) {
  201. Destroy();
  202. return;
  203. }
  204. }
  205. TThreadCurrentJobGuard g(Job.Get());
  206. NActor::TQueueInActor<TJobRunner, TJobResponseMessage>::DequeueAll();
  207. if (Alarm.FetchTask()) {
  208. if (Job->AnyPendingToSend()) {
  209. Y_ASSERT(Job->SleepUntil == 0);
  210. Job->SendPending();
  211. if (Job->AnyPendingToSend()) {
  212. }
  213. } else {
  214. // regular alarm
  215. Y_ASSERT(Job->Pending.empty());
  216. Y_ASSERT(Job->SleepUntil != 0);
  217. Job->SleepUntil = 0;
  218. }
  219. }
  220. for (;;) {
  221. if (Job->Pending.empty() && !!Job->Handler && Job->Status == MESSAGE_OK) {
  222. TWhatThreadDoesPushPop pp("do call job handler (do not confuse with reply handler)");
  223. Job->Handler = Job->Handler(Job->Module, Job.Get(), Job->Message);
  224. }
  225. if (Job->SleepUntil != 0) {
  226. ScheduleAt(TInstant::MilliSeconds(Job->SleepUntil));
  227. return;
  228. }
  229. Job->SendPending();
  230. if (Job->AnyPendingToSend()) {
  231. ScheduleAt(TInstant::Now() + TDuration::Seconds(1));
  232. return;
  233. }
  234. if (!Job->Pending.empty()) {
  235. // waiting replies
  236. return;
  237. }
  238. if (Job->IsDone()) {
  239. Destroy();
  240. return;
  241. }
  242. }
  243. }
  244. };
  245. }
  246. static inline TJobRunner* GetJob(TBusMessage* message) {
  247. return (TJobRunner*)message->Data;
  248. }
  249. static inline void SetJob(TBusMessage* message, TJobRunner* job) {
  250. message->Data = job;
  251. }
  252. TBusJob::TBusJob(TBusModule* module, TBusMessage* message)
  253. : Status(MESSAGE_OK)
  254. , Runner()
  255. , Message(message)
  256. , ReplySent(false)
  257. , Module(module)
  258. , ModuleImpl(module->Impl.Get())
  259. , SleepUntil(0)
  260. {
  261. Handler = TJobHandler(&TBusModule::Start);
  262. }
  263. TBusJob::~TBusJob() {
  264. Y_ASSERT(Pending.size() == 0);
  265. //Y_ASSERT(SleepUntil == 0);
  266. ClearAllMessageStates();
  267. }
  268. TNetAddr TBusJob::GetPeerAddrNetAddr() const {
  269. Y_ABORT_UNLESS(!!OnMessageContext);
  270. return OnMessageContext.GetPeerAddrNetAddr();
  271. }
  272. void TBusJob::CheckThreadCurrentJob() {
  273. Y_ASSERT(ThreadCurrentJob == this);
  274. }
  275. /////////////////////////////////////////////////////////
  276. /// \brief Send messages in pending list
  277. /// If at least one message is gone return true
  278. /// If message has not been send, move it to Finished with appropriate error code
  279. bool TBusJob::SendPending() {
  280. // Iterator type must be size_t, not vector::iterator,
  281. // because `DoCallReplyHandler` may call `Send` that modifies `Pending` vector,
  282. // that in turn invalidates iterator.
  283. // Implementation assumes that `DoCallReplyHandler` only pushes back to `Pending`
  284. // (not erases, and not inserts) so iteration by index is valid.
  285. size_t it = 0;
  286. while (it != Pending.size()) {
  287. TJobState& call = Pending[it];
  288. if (call.Status == MESSAGE_DONT_ASK) {
  289. EMessageStatus getAddressStatus = MESSAGE_OK;
  290. TNetAddr addr;
  291. if (call.UseAddr) {
  292. addr = call.Addr;
  293. } else {
  294. getAddressStatus = const_cast<TBusProtocol*>(call.Session->GetProto())->GetDestination(call.Session, call.Message, call.Session->GetQueue()->GetLocator(), &addr);
  295. }
  296. if (getAddressStatus == MESSAGE_OK) {
  297. // hold extra reference for each request in flight
  298. Runner->Ref();
  299. if (call.OneWay) {
  300. call.Status = call.Session->SendMessageOneWay(call.Message, &addr);
  301. } else {
  302. call.Status = call.Session->SendMessage(call.Message, &addr);
  303. }
  304. if (call.Status != MESSAGE_OK) {
  305. Runner->UnRef();
  306. }
  307. } else {
  308. call.Status = getAddressStatus;
  309. }
  310. }
  311. if (call.Status == MESSAGE_OK) {
  312. ++it; // keep pending list until we get reply
  313. } else if (call.Status == MESSAGE_BUSY) {
  314. Y_ABORT("MESSAGE_BUSY is prohibited in modules. Please increase MaxInFlight");
  315. } else if (call.Status == MESSAGE_CONNECT_FAILED && call.NumRetries < call.MaxRetries) {
  316. ++it; // try up to call.MaxRetries times to send message
  317. call.NumRetries++;
  318. DoCallReplyHandler(call);
  319. call.Status = MESSAGE_DONT_ASK;
  320. call.Message->Reset(); // generate new Id
  321. } else {
  322. Finished.push_back(call);
  323. DoCallReplyHandler(call);
  324. Pending.erase(Pending.begin() + it);
  325. }
  326. }
  327. return Pending.size() > 0;
  328. }
  329. bool TBusJob::AnyPendingToSend() {
  330. for (unsigned i = 0; i < Pending.size(); ++i) {
  331. if (Pending[i].Status == MESSAGE_DONT_ASK) {
  332. return true;
  333. }
  334. }
  335. return false;
  336. }
  337. bool TBusJob::IsDone() {
  338. bool r = (SleepUntil == 0 && Pending.size() == 0 && (Handler == nullptr || Status != MESSAGE_OK));
  339. return r;
  340. }
  341. void TBusJob::CallJobHandlerOnly() {
  342. TThreadCurrentJobGuard threadCurrentJobGuard(this);
  343. TWhatThreadDoesPushPop pp("do call job handler (do not confuse with reply handler)");
  344. Handler = Handler(ModuleImpl->Module, this, Message);
  345. }
  346. bool TBusJob::CallJobHandler() {
  347. /// go on as far as we can go without waiting
  348. while (!IsDone()) {
  349. /// call the handler
  350. CallJobHandlerOnly();
  351. /// quit if job is canceled
  352. if (Status != MESSAGE_OK) {
  353. break;
  354. }
  355. /// there are messages to send and wait for reply
  356. SendPending();
  357. if (!Pending.empty()) {
  358. break;
  359. }
  360. /// asked to sleep
  361. if (SleepUntil) {
  362. break;
  363. }
  364. }
  365. Y_ABORT_UNLESS(!(Pending.size() == 0 && Handler == nullptr && Status == MESSAGE_OK && !ReplySent),
  366. "Handler returned NULL without Cancel() or SendReply() for message=%016" PRIx64 " type=%d",
  367. Message->GetHeader()->Id, Message->GetHeader()->Type);
  368. return IsDone();
  369. }
  370. void TBusJob::DoCallReplyHandler(TJobState& call) {
  371. if (call.Handler) {
  372. TWhatThreadDoesPushPop pp("do call reply handler (do not confuse with job handler)");
  373. TThreadCurrentJobGuard threadCurrentJobGuard(this);
  374. (Module->*(call.Handler))(this, call.Status, call.Message, call.Reply);
  375. }
  376. }
  377. int TBusJob::CallReplyHandler(EMessageStatus status, TBusMessage* mess, TBusMessage* reply) {
  378. /// find handler for given message and update it's status
  379. size_t i = 0;
  380. for (; i < Pending.size(); ++i) {
  381. TJobState& call = Pending[i];
  382. if (call.Message == mess) {
  383. break;
  384. }
  385. }
  386. /// if not found, report error
  387. if (i == Pending.size()) {
  388. Y_ABORT("must not happen");
  389. }
  390. /// fill in response into job state
  391. TJobState& call = Pending[i];
  392. call.Status = status;
  393. Y_ASSERT(call.Message == mess);
  394. call.Reply = reply;
  395. if ((status == MESSAGE_TIMEOUT || status == MESSAGE_DELIVERY_FAILED) && call.NumRetries < call.MaxRetries) {
  396. call.NumRetries++;
  397. call.Status = MESSAGE_DONT_ASK;
  398. call.Message->Reset(); // generate new Id
  399. DoCallReplyHandler(call);
  400. return 0;
  401. }
  402. /// call the handler if provided
  403. DoCallReplyHandler(call);
  404. /// move job state into the finished stack
  405. Finished.push_back(Pending[i]);
  406. Pending.erase(Pending.begin() + i);
  407. return 0;
  408. }
  409. ///////////////////////////////////////////////////////////////
  410. /// send message to any other session or application
  411. void TBusJob::Send(TBusMessageAutoPtr mess, TBusClientSession* session, TReplyHandler rhandler, size_t maxRetries) {
  412. CheckThreadCurrentJob();
  413. SetJob(mess.Get(), Runner);
  414. Pending.push_back(TJobState(rhandler, MESSAGE_DONT_ASK, mess.Release(), session, nullptr, maxRetries, nullptr, false));
  415. }
  416. void TBusJob::Send(TBusMessageAutoPtr mess, TBusClientSession* session, TReplyHandler rhandler, size_t maxRetries, const TNetAddr& addr) {
  417. CheckThreadCurrentJob();
  418. SetJob(mess.Get(), Runner);
  419. Pending.push_back(TJobState(rhandler, MESSAGE_DONT_ASK, mess.Release(), session, nullptr, maxRetries, &addr, false));
  420. }
  421. void TBusJob::SendOneWayTo(TBusMessageAutoPtr req, TBusClientSession* session, const TNetAddr& addr) {
  422. CheckThreadCurrentJob();
  423. SetJob(req.Get(), Runner);
  424. Pending.push_back(TJobState(nullptr, MESSAGE_DONT_ASK, req.Release(), session, nullptr, 0, &addr, true));
  425. }
  426. void TBusJob::SendOneWayWithLocator(TBusMessageAutoPtr req, TBusClientSession* session) {
  427. CheckThreadCurrentJob();
  428. SetJob(req.Get(), Runner);
  429. Pending.push_back(TJobState(nullptr, MESSAGE_DONT_ASK, req.Release(), session, nullptr, 0, nullptr, true));
  430. }
  431. ///////////////////////////////////////////////////////////////
  432. /// send reply to the starter message
  433. void TBusJob::SendReply(TBusMessageAutoPtr reply) {
  434. CheckThreadCurrentJob();
  435. Y_ABORT_UNLESS(!ReplySent, "cannot call SendReply twice");
  436. ReplySent = true;
  437. if (!OnMessageContext)
  438. return;
  439. EMessageStatus ok = OnMessageContext.SendReplyMove(reply);
  440. if (ok != MESSAGE_OK) {
  441. // TODO: count errors
  442. }
  443. }
  444. /// set the flag to terminate job at the earliest convenience
  445. void TBusJob::Cancel(EMessageStatus status) {
  446. CheckThreadCurrentJob();
  447. Status = status;
  448. }
  449. void TBusJob::ClearState(TJobState& call) {
  450. TJobStateVec::iterator it;
  451. for (it = Finished.begin(); it != Finished.end(); ++it) {
  452. TJobState& state = *it;
  453. if (&call == &state) {
  454. ::ClearState(&call);
  455. Finished.erase(it);
  456. return;
  457. }
  458. }
  459. Y_ASSERT(0);
  460. }
  461. void TBusJob::ClearAllMessageStates() {
  462. ClearJobStateVector(&Finished);
  463. ClearJobStateVector(&Pending);
  464. }
  465. void TBusJob::Sleep(int milliSeconds) {
  466. CheckThreadCurrentJob();
  467. Y_ABORT_UNLESS(Pending.empty(), "sleep is not allowed when there are pending job");
  468. Y_ABORT_UNLESS(SleepUntil == 0, "must not override sleep");
  469. SleepUntil = Now() + milliSeconds;
  470. }
  471. TString TBusJob::GetStatus(unsigned flags) {
  472. TString strReturn;
  473. strReturn += Sprintf(" job=%016" PRIx64 " type=%d sent=%d pending=%d (%d) %s\n",
  474. Message->GetHeader()->Id,
  475. (int)Message->GetHeader()->Type,
  476. (int)(Now() - Message->GetHeader()->SendTime) / 1000,
  477. (int)Pending.size(),
  478. (int)Finished.size(),
  479. Status != MESSAGE_OK ? ToString(Status).data() : "");
  480. TJobStateVec::iterator it;
  481. for (it = Pending.begin(); it != Pending.end(); ++it) {
  482. TJobState& call = *it;
  483. strReturn += call.GetStatus(flags);
  484. }
  485. return strReturn;
  486. }
  487. TString TJobState::GetStatus(unsigned flags) {
  488. Y_UNUSED(flags);
  489. TString strReturn;
  490. strReturn += Sprintf(" pending=%016" PRIx64 " type=%d (%s) sent=%d %s\n",
  491. Message->GetHeader()->Id,
  492. (int)Message->GetHeader()->Type,
  493. Session->GetProto()->GetService(),
  494. (int)(Now() - Message->GetHeader()->SendTime) / 1000,
  495. ToString(Status).data());
  496. return strReturn;
  497. }
  498. //////////////////////////////////////////////////////////////////////
  499. void TBusModuleImpl::CancelJob(TBusJob* job, EMessageStatus status) {
  500. TWhatThreadDoesAcquireGuard<TMutex> G(Lock, "modules: acquiring lock for CancelJob");
  501. if (job) {
  502. job->Cancel(status);
  503. }
  504. }
  505. TString TBusModuleImpl::GetStatus(unsigned flags) {
  506. Y_UNUSED(flags);
  507. TWhatThreadDoesAcquireGuard<TMutex> G(Lock, "modules: acquiring lock for GetStatus");
  508. TString strReturn = Sprintf("JobsInFlight=%d\n", (int)Jobs.size());
  509. for (auto job : Jobs) {
  510. //strReturn += job->Job->GetStatus(flags);
  511. Y_UNUSED(job);
  512. strReturn += "TODO\n";
  513. }
  514. return strReturn;
  515. }
  516. TBusModuleConfig::TBusModuleConfig()
  517. : StarterMaxInFlight(1000)
  518. {
  519. }
  520. TBusModuleConfig::TSecret::TSecret()
  521. : SchedulePeriod(TDuration::Seconds(1))
  522. {
  523. }
  524. TBusModule::TBusModule(const char* name)
  525. : Impl(new TBusModuleImpl(this, name))
  526. {
  527. }
  528. TBusModule::~TBusModule() {
  529. }
  530. const char* TBusModule::GetName() const {
  531. return Impl->Name;
  532. }
  533. void TBusModule::SetConfig(const TBusModuleConfig& config) {
  534. Impl->ModuleConfig = config;
  535. }
  536. bool TBusModule::StartInput() {
  537. Y_ABORT_UNLESS(Impl->State == TBusModuleImpl::CREATED, "state check");
  538. Y_ABORT_UNLESS(!!Impl->Queue, "state check");
  539. Impl->State = TBusModuleImpl::RUNNING;
  540. Y_ASSERT(!Impl->ExternalSession);
  541. TBusServerSessionPtr extSession = CreateExtSession(*Impl->Queue);
  542. if (extSession != nullptr) {
  543. Impl->ExternalSession = extSession;
  544. }
  545. return true;
  546. }
  547. bool TBusModule::Shutdown() {
  548. Impl->Shutdown();
  549. return true;
  550. }
  551. TBusJob* TBusModule::CreateJobInstance(TBusMessage* message) {
  552. TBusJob* job = new TBusJob(this, message);
  553. return job;
  554. }
  555. /**
  556. Example for external session creation:
  557. TBusSession* TMyModule::CreateExtSession(TBusMessageQueue& queue) {
  558. TBusSession* session = CreateDefaultDestination(queue, &ExternalProto, ExternalConfig);
  559. session->RegisterService(hostname, begin, end);
  560. return session;
  561. */
  562. bool TBusModule::CreatePrivateSessions(TBusMessageQueue* queue) {
  563. Impl->Queue = queue;
  564. return true;
  565. }
  566. int TBusModule::GetModuleSessionInFlight() const {
  567. return Impl->Size();
  568. }
  569. TIntrusivePtr<TBusModuleInternal> TBusModule::GetInternal() {
  570. return Impl.Get();
  571. }
  572. TBusServerSessionPtr TBusModule::CreateDefaultDestination(
  573. TBusMessageQueue& queue, TBusProtocol* proto, const TBusServerSessionConfig& config, const TString& name) {
  574. TBusServerSessionConfig patchedConfig = config;
  575. patchedConfig.ExecuteOnMessageInWorkerPool = false;
  576. if (!patchedConfig.Name) {
  577. patchedConfig.Name = name;
  578. }
  579. if (!patchedConfig.Name) {
  580. patchedConfig.Name = Impl->Name;
  581. }
  582. TBusServerSessionPtr session =
  583. TBusServerSession::Create(proto, Impl->ModuleServerHandler.Get(), patchedConfig, &queue);
  584. Impl->ServerSessions.push_back(session);
  585. return session;
  586. }
  587. TBusClientSessionPtr TBusModule::CreateDefaultSource(
  588. TBusMessageQueue& queue, TBusProtocol* proto, const TBusClientSessionConfig& config, const TString& name) {
  589. TBusClientSessionConfig patchedConfig = config;
  590. patchedConfig.ExecuteOnReplyInWorkerPool = false;
  591. if (!patchedConfig.Name) {
  592. patchedConfig.Name = name;
  593. }
  594. if (!patchedConfig.Name) {
  595. patchedConfig.Name = Impl->Name;
  596. }
  597. TBusClientSessionPtr session =
  598. TBusClientSession::Create(proto, Impl->ModuleClientHandler.Get(), patchedConfig, &queue);
  599. Impl->ClientSessions.push_back(session);
  600. return session;
  601. }
  602. TBusStarter* TBusModule::CreateDefaultStarter(TBusMessageQueue&, const TBusSessionConfig& config) {
  603. TBusStarter* session = new TBusStarter(this, config);
  604. Impl->Starters.push_back(session);
  605. return session;
  606. }
  607. void TBusModule::OnClientConnectionEvent(const TClientConnectionEvent& event) {
  608. Y_UNUSED(event);
  609. }
  610. TString TBusModule::GetStatus(unsigned flags) {
  611. TString strReturn = Sprintf("%s\n", Impl->Name);
  612. strReturn += Impl->GetStatus(flags);
  613. return strReturn;
  614. }
  615. }
  616. void TBusModuleImpl::AddJob(TJobRunner* jobRunner) {
  617. TWhatThreadDoesAcquireGuard<TMutex> G(Lock, "modules: acquiring lock for AddJob");
  618. Jobs.push_back(jobRunner);
  619. jobRunner->JobStorageIterator = Jobs.end();
  620. --jobRunner->JobStorageIterator;
  621. }
  622. void TBusModuleImpl::DestroyJob(TJobRunner* job) {
  623. Y_ASSERT(job->JobStorageIterator != TList<TJobRunner*>::iterator());
  624. {
  625. TWhatThreadDoesAcquireGuard<TMutex> G(Lock, "modules: acquiring lock for DestroyJob");
  626. int jobCount = AtomicDecrement(JobCount);
  627. Y_ABORT_UNLESS(jobCount >= 0, "decremented too much");
  628. Jobs.erase(job->JobStorageIterator);
  629. if (AtomicGet(State) == STOPPED) {
  630. if (jobCount == 0) {
  631. ShutdownCondVar.BroadCast();
  632. }
  633. }
  634. }
  635. job->JobStorageIterator = TList<TJobRunner*>::iterator();
  636. }
  637. void TBusModuleImpl::OnMessageReceived(TAutoPtr<TBusMessage> msg0, TOnMessageContext& context) {
  638. TBusMessage* msg = !!msg0 ? msg0.Get() : context.GetMessage();
  639. Y_ABORT_UNLESS(!!msg);
  640. THolder<TJobRunner> jobRunner(new TJobRunner(Module->CreateJobInstance(msg)));
  641. jobRunner->Job->MessageHolder.Reset(msg0.Release());
  642. jobRunner->Job->OnMessageContext.Swap(context);
  643. SetJob(jobRunner->Job->Message, jobRunner.Get());
  644. AtomicIncrement(JobCount);
  645. AddJob(jobRunner.Get());
  646. jobRunner.Release()->Schedule();
  647. }
  648. void TBusModuleImpl::Shutdown() {
  649. if (AtomicGet(State) != TBusModuleImpl::RUNNING) {
  650. AtomicSet(State, TBusModuleImpl::STOPPED);
  651. return;
  652. }
  653. AtomicSet(State, TBusModuleImpl::STOPPED);
  654. for (auto& clientSession : ClientSessions) {
  655. clientSession->Shutdown();
  656. }
  657. for (auto& serverSession : ServerSessions) {
  658. serverSession->Shutdown();
  659. }
  660. for (size_t starter = 0; starter < Starters.size(); ++starter) {
  661. Starters[starter]->Shutdown();
  662. }
  663. {
  664. TWhatThreadDoesAcquireGuard<TMutex> guard(Lock, "modules: acquiring lock for Shutdown");
  665. for (auto& Job : Jobs) {
  666. Job->Schedule();
  667. }
  668. while (!Jobs.empty()) {
  669. ShutdownCondVar.WaitI(Lock);
  670. }
  671. }
  672. }
  673. EMessageStatus TBusModule::StartJob(TAutoPtr<TBusMessage> message) {
  674. Y_ABORT_UNLESS(Impl->State == TBusModuleImpl::RUNNING);
  675. Y_ABORT_UNLESS(!!Impl->Queue);
  676. if ((unsigned)AtomicGet(Impl->JobCount) >= Impl->ModuleConfig.StarterMaxInFlight) {
  677. return MESSAGE_BUSY;
  678. }
  679. TOnMessageContext dummy;
  680. Impl->OnMessageReceived(message.Release(), dummy);
  681. return MESSAGE_OK;
  682. }
  683. void TModuleServerHandler::OnMessage(TOnMessageContext& msg) {
  684. Module->OnMessageReceived(nullptr, msg);
  685. }
  686. void TModuleClientHandler::OnReply(TAutoPtr<TBusMessage> req, TAutoPtr<TBusMessage> resp) {
  687. TJobRunner* job = GetJob(req.Get());
  688. Y_ASSERT(job);
  689. Y_ASSERT(job->Job->Message != req.Get());
  690. job->EnqueueAndSchedule(TJobResponseMessage(req.Release(), resp.Release(), MESSAGE_OK));
  691. job->UnRef();
  692. }
  693. void TModuleClientHandler::OnMessageSentOneWay(TAutoPtr<TBusMessage> req) {
  694. TJobRunner* job = GetJob(req.Get());
  695. Y_ASSERT(job);
  696. Y_ASSERT(job->Job->Message != req.Get());
  697. job->EnqueueAndSchedule(TJobResponseMessage(req.Release(), nullptr, MESSAGE_OK));
  698. job->UnRef();
  699. }
  700. void TModuleClientHandler::OnError(TAutoPtr<TBusMessage> msg, EMessageStatus status) {
  701. TJobRunner* job = GetJob(msg.Get());
  702. if (job) {
  703. Y_ASSERT(job->Job->Message != msg.Get());
  704. job->EnqueueAndSchedule(TJobResponseMessage(msg.Release(), nullptr, status));
  705. job->UnRef();
  706. }
  707. }
  708. void TModuleClientHandler::OnClientConnectionEvent(const TClientConnectionEvent& event) {
  709. Module->OnClientConnectionEvent(event);
  710. }