123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881 |
- #include "module.h"
- #include <library/cpp/messagebus/scheduler_actor.h>
- #include <library/cpp/messagebus/thread_extra.h>
- #include <library/cpp/messagebus/actor/actor.h>
- #include <library/cpp/messagebus/actor/queue_in_actor.h>
- #include <library/cpp/messagebus/actor/what_thread_does.h>
- #include <library/cpp/messagebus/actor/what_thread_does_guard.h>
- #include <util/generic/singleton.h>
- #include <util/string/printf.h>
- #include <util/system/event.h>
- using namespace NActor;
- using namespace NBus;
- using namespace NBus::NPrivate;
- namespace {
- Y_POD_STATIC_THREAD(TBusJob*)
- ThreadCurrentJob;
- struct TThreadCurrentJobGuard {
- TBusJob* Prev;
- TThreadCurrentJobGuard(TBusJob* job)
- : Prev(ThreadCurrentJob)
- {
- Y_ASSERT(!ThreadCurrentJob || ThreadCurrentJob == job);
- ThreadCurrentJob = job;
- }
- ~TThreadCurrentJobGuard() {
- ThreadCurrentJob = Prev;
- }
- };
- void ClearState(NBus::TJobState* state) {
- /// skip sendbacks handlers
- if (state->Message != state->Reply) {
- if (state->Message) {
- delete state->Message;
- state->Message = nullptr;
- }
- if (state->Reply) {
- delete state->Reply;
- state->Reply = nullptr;
- }
- }
- }
- void ClearJobStateVector(NBus::TJobStateVec* vec) {
- Y_ASSERT(vec);
- for (auto& call : *vec) {
- ClearState(&call);
- }
- vec->clear();
- }
- }
- namespace NBus {
- namespace NPrivate {
- class TJobStorage {
- };
- struct TModuleClientHandler
- : public IBusClientHandler {
- TModuleClientHandler(TBusModuleImpl* module)
- : Module(module)
- {
- }
- void OnReply(TAutoPtr<TBusMessage> req, TAutoPtr<TBusMessage> reply) override;
- void OnMessageSentOneWay(TAutoPtr<TBusMessage> pMessage) override;
- void OnError(TAutoPtr<TBusMessage> msg, EMessageStatus status) override;
- void OnClientConnectionEvent(const TClientConnectionEvent& event) override;
- TBusModuleImpl* const Module;
- };
- struct TModuleServerHandler
- : public IBusServerHandler {
- TModuleServerHandler(TBusModuleImpl* module)
- : Module(module)
- {
- }
- void OnMessage(TOnMessageContext& msg) override;
- TBusModuleImpl* const Module;
- };
- struct TBusModuleImpl: public TBusModuleInternal {
- TBusModule* const Module;
- TBusMessageQueue* Queue;
- TScheduler Scheduler;
- const char* const Name;
- typedef TList<TJobRunner*> TBusJobList;
- /// jobs currently in-flight on this module
- TBusJobList Jobs;
- /// module level mutex
- TMutex Lock;
- TCondVar ShutdownCondVar;
- TAtomic JobCount;
- enum EState {
- CREATED,
- RUNNING,
- STOPPED,
- };
- TAtomic State;
- TBusModuleConfig ModuleConfig;
- TBusServerSessionPtr ExternalSession;
- /// protocol for local proxy session
- THolder<IBusClientHandler> ModuleClientHandler;
- THolder<IBusServerHandler> ModuleServerHandler;
- TVector<TSimpleSharedPtr<TBusStarter>> Starters;
- // Sessions must be destroyed before
- // ModuleClientHandler / ModuleServerHandler
- TVector<TBusClientSessionPtr> ClientSessions;
- TVector<TBusServerSessionPtr> ServerSessions;
- TBusModuleImpl(TBusModule* module, const char* name)
- : Module(module)
- , Queue()
- , Name(name)
- , JobCount(0)
- , State(CREATED)
- , ExternalSession(nullptr)
- , ModuleClientHandler(new TModuleClientHandler(this))
- , ModuleServerHandler(new TModuleServerHandler(this))
- {
- }
- ~TBusModuleImpl() override {
- // Shutdown cannot be called from destructor,
- // because module has virtual methods.
- Y_ABORT_UNLESS(State != RUNNING, "if running, must explicitly call Shutdown() before destructor");
- Scheduler.Stop();
- while (!Jobs.empty()) {
- DestroyJob(Jobs.front());
- }
- Y_ABORT_UNLESS(JobCount == 0, "state check");
- }
- void OnMessageReceived(TAutoPtr<TBusMessage> msg, TOnMessageContext&);
- void AddJob(TJobRunner* jobRunner);
- void DestroyJob(TJobRunner* job);
- /// terminate job on this message
- void CancelJob(TBusJob* job, EMessageStatus status);
- /// prints statuses of jobs
- TString GetStatus(unsigned flags);
- size_t Size() const {
- return AtomicGet(JobCount);
- }
- void Shutdown();
- TVector<TBusClientSessionPtr> GetClientSessionsInternal() override {
- return ClientSessions;
- }
- TVector<TBusServerSessionPtr> GetServerSessionsInternal() override {
- return ServerSessions;
- }
- TBusMessageQueue* GetQueue() override {
- return Queue;
- }
- TString GetNameInternal() override {
- return Name;
- }
- TString GetStatusSingleLine() override {
- TStringStream ss;
- ss << "jobs: " << Size();
- return ss.Str();
- }
- void OnClientConnectionEvent(const TClientConnectionEvent& event) {
- Module->OnClientConnectionEvent(event);
- }
- };
- struct TJobResponseMessage {
- TBusMessage* Request;
- TBusMessage* Response;
- EMessageStatus Status;
- TJobResponseMessage(TBusMessage* request, TBusMessage* response, EMessageStatus status)
- : Request(request)
- , Response(response)
- , Status(status)
- {
- }
- };
- struct TJobRunner: public TAtomicRefCount<TJobRunner>,
- public NActor::TActor<TJobRunner>,
- public NActor::TQueueInActor<TJobRunner, TJobResponseMessage>,
- public TScheduleActor<TJobRunner> {
- THolder<TBusJob> Job;
- TList<TJobRunner*>::iterator JobStorageIterator;
- TJobRunner(TAutoPtr<TBusJob> job)
- : NActor::TActor<TJobRunner>(job->ModuleImpl->Queue->GetExecutor())
- , TScheduleActor<TJobRunner>(&job->ModuleImpl->Scheduler)
- , Job(job.Release())
- , JobStorageIterator()
- {
- Job->Runner = this;
- }
- ~TJobRunner() override {
- Y_ASSERT(JobStorageIterator == TList<TJobRunner*>::iterator());
- }
- void ProcessItem(NActor::TDefaultTag, NActor::TDefaultTag, const TJobResponseMessage& message) {
- Job->CallReplyHandler(message.Status, message.Request, message.Response);
- }
- void Destroy() {
- if (!!Job->OnMessageContext) {
- if (!Job->ReplySent) {
- Job->OnMessageContext.ForgetRequest();
- }
- }
- Job->ModuleImpl->DestroyJob(this);
- }
- void Act(NActor::TDefaultTag) {
- if (JobStorageIterator == TList<TJobRunner*>::iterator()) {
- return;
- }
- if (Job->SleepUntil != 0) {
- if (AtomicGet(Job->ModuleImpl->State) == TBusModuleImpl::STOPPED) {
- Destroy();
- return;
- }
- }
- TThreadCurrentJobGuard g(Job.Get());
- NActor::TQueueInActor<TJobRunner, TJobResponseMessage>::DequeueAll();
- if (Alarm.FetchTask()) {
- if (Job->AnyPendingToSend()) {
- Y_ASSERT(Job->SleepUntil == 0);
- Job->SendPending();
- if (Job->AnyPendingToSend()) {
- }
- } else {
- // regular alarm
- Y_ASSERT(Job->Pending.empty());
- Y_ASSERT(Job->SleepUntil != 0);
- Job->SleepUntil = 0;
- }
- }
- for (;;) {
- if (Job->Pending.empty() && !!Job->Handler && Job->Status == MESSAGE_OK) {
- TWhatThreadDoesPushPop pp("do call job handler (do not confuse with reply handler)");
- Job->Handler = Job->Handler(Job->Module, Job.Get(), Job->Message);
- }
- if (Job->SleepUntil != 0) {
- ScheduleAt(TInstant::MilliSeconds(Job->SleepUntil));
- return;
- }
- Job->SendPending();
- if (Job->AnyPendingToSend()) {
- ScheduleAt(TInstant::Now() + TDuration::Seconds(1));
- return;
- }
- if (!Job->Pending.empty()) {
- // waiting replies
- return;
- }
- if (Job->IsDone()) {
- Destroy();
- return;
- }
- }
- }
- };
- }
- static inline TJobRunner* GetJob(TBusMessage* message) {
- return (TJobRunner*)message->Data;
- }
- static inline void SetJob(TBusMessage* message, TJobRunner* job) {
- message->Data = job;
- }
- TBusJob::TBusJob(TBusModule* module, TBusMessage* message)
- : Status(MESSAGE_OK)
- , Runner()
- , Message(message)
- , ReplySent(false)
- , Module(module)
- , ModuleImpl(module->Impl.Get())
- , SleepUntil(0)
- {
- Handler = TJobHandler(&TBusModule::Start);
- }
- TBusJob::~TBusJob() {
- Y_ASSERT(Pending.size() == 0);
- //Y_ASSERT(SleepUntil == 0);
- ClearAllMessageStates();
- }
- TNetAddr TBusJob::GetPeerAddrNetAddr() const {
- Y_ABORT_UNLESS(!!OnMessageContext);
- return OnMessageContext.GetPeerAddrNetAddr();
- }
- void TBusJob::CheckThreadCurrentJob() {
- Y_ASSERT(ThreadCurrentJob == this);
- }
- /////////////////////////////////////////////////////////
- /// \brief Send messages in pending list
- /// If at least one message is gone return true
- /// If message has not been send, move it to Finished with appropriate error code
- bool TBusJob::SendPending() {
- // Iterator type must be size_t, not vector::iterator,
- // because `DoCallReplyHandler` may call `Send` that modifies `Pending` vector,
- // that in turn invalidates iterator.
- // Implementation assumes that `DoCallReplyHandler` only pushes back to `Pending`
- // (not erases, and not inserts) so iteration by index is valid.
- size_t it = 0;
- while (it != Pending.size()) {
- TJobState& call = Pending[it];
- if (call.Status == MESSAGE_DONT_ASK) {
- EMessageStatus getAddressStatus = MESSAGE_OK;
- TNetAddr addr;
- if (call.UseAddr) {
- addr = call.Addr;
- } else {
- getAddressStatus = const_cast<TBusProtocol*>(call.Session->GetProto())->GetDestination(call.Session, call.Message, call.Session->GetQueue()->GetLocator(), &addr);
- }
- if (getAddressStatus == MESSAGE_OK) {
- // hold extra reference for each request in flight
- Runner->Ref();
- if (call.OneWay) {
- call.Status = call.Session->SendMessageOneWay(call.Message, &addr);
- } else {
- call.Status = call.Session->SendMessage(call.Message, &addr);
- }
- if (call.Status != MESSAGE_OK) {
- Runner->UnRef();
- }
- } else {
- call.Status = getAddressStatus;
- }
- }
- if (call.Status == MESSAGE_OK) {
- ++it; // keep pending list until we get reply
- } else if (call.Status == MESSAGE_BUSY) {
- Y_ABORT("MESSAGE_BUSY is prohibited in modules. Please increase MaxInFlight");
- } else if (call.Status == MESSAGE_CONNECT_FAILED && call.NumRetries < call.MaxRetries) {
- ++it; // try up to call.MaxRetries times to send message
- call.NumRetries++;
- DoCallReplyHandler(call);
- call.Status = MESSAGE_DONT_ASK;
- call.Message->Reset(); // generate new Id
- } else {
- Finished.push_back(call);
- DoCallReplyHandler(call);
- Pending.erase(Pending.begin() + it);
- }
- }
- return Pending.size() > 0;
- }
- bool TBusJob::AnyPendingToSend() {
- for (unsigned i = 0; i < Pending.size(); ++i) {
- if (Pending[i].Status == MESSAGE_DONT_ASK) {
- return true;
- }
- }
- return false;
- }
- bool TBusJob::IsDone() {
- bool r = (SleepUntil == 0 && Pending.size() == 0 && (Handler == nullptr || Status != MESSAGE_OK));
- return r;
- }
- void TBusJob::CallJobHandlerOnly() {
- TThreadCurrentJobGuard threadCurrentJobGuard(this);
- TWhatThreadDoesPushPop pp("do call job handler (do not confuse with reply handler)");
- Handler = Handler(ModuleImpl->Module, this, Message);
- }
- bool TBusJob::CallJobHandler() {
- /// go on as far as we can go without waiting
- while (!IsDone()) {
- /// call the handler
- CallJobHandlerOnly();
- /// quit if job is canceled
- if (Status != MESSAGE_OK) {
- break;
- }
- /// there are messages to send and wait for reply
- SendPending();
- if (!Pending.empty()) {
- break;
- }
- /// asked to sleep
- if (SleepUntil) {
- break;
- }
- }
- Y_ABORT_UNLESS(!(Pending.size() == 0 && Handler == nullptr && Status == MESSAGE_OK && !ReplySent),
- "Handler returned NULL without Cancel() or SendReply() for message=%016" PRIx64 " type=%d",
- Message->GetHeader()->Id, Message->GetHeader()->Type);
- return IsDone();
- }
- void TBusJob::DoCallReplyHandler(TJobState& call) {
- if (call.Handler) {
- TWhatThreadDoesPushPop pp("do call reply handler (do not confuse with job handler)");
- TThreadCurrentJobGuard threadCurrentJobGuard(this);
- (Module->*(call.Handler))(this, call.Status, call.Message, call.Reply);
- }
- }
- int TBusJob::CallReplyHandler(EMessageStatus status, TBusMessage* mess, TBusMessage* reply) {
- /// find handler for given message and update it's status
- size_t i = 0;
- for (; i < Pending.size(); ++i) {
- TJobState& call = Pending[i];
- if (call.Message == mess) {
- break;
- }
- }
- /// if not found, report error
- if (i == Pending.size()) {
- Y_ABORT("must not happen");
- }
- /// fill in response into job state
- TJobState& call = Pending[i];
- call.Status = status;
- Y_ASSERT(call.Message == mess);
- call.Reply = reply;
- if ((status == MESSAGE_TIMEOUT || status == MESSAGE_DELIVERY_FAILED) && call.NumRetries < call.MaxRetries) {
- call.NumRetries++;
- call.Status = MESSAGE_DONT_ASK;
- call.Message->Reset(); // generate new Id
- DoCallReplyHandler(call);
- return 0;
- }
- /// call the handler if provided
- DoCallReplyHandler(call);
- /// move job state into the finished stack
- Finished.push_back(Pending[i]);
- Pending.erase(Pending.begin() + i);
- return 0;
- }
- ///////////////////////////////////////////////////////////////
- /// send message to any other session or application
- void TBusJob::Send(TBusMessageAutoPtr mess, TBusClientSession* session, TReplyHandler rhandler, size_t maxRetries) {
- CheckThreadCurrentJob();
- SetJob(mess.Get(), Runner);
- Pending.push_back(TJobState(rhandler, MESSAGE_DONT_ASK, mess.Release(), session, nullptr, maxRetries, nullptr, false));
- }
- void TBusJob::Send(TBusMessageAutoPtr mess, TBusClientSession* session, TReplyHandler rhandler, size_t maxRetries, const TNetAddr& addr) {
- CheckThreadCurrentJob();
- SetJob(mess.Get(), Runner);
- Pending.push_back(TJobState(rhandler, MESSAGE_DONT_ASK, mess.Release(), session, nullptr, maxRetries, &addr, false));
- }
- void TBusJob::SendOneWayTo(TBusMessageAutoPtr req, TBusClientSession* session, const TNetAddr& addr) {
- CheckThreadCurrentJob();
- SetJob(req.Get(), Runner);
- Pending.push_back(TJobState(nullptr, MESSAGE_DONT_ASK, req.Release(), session, nullptr, 0, &addr, true));
- }
- void TBusJob::SendOneWayWithLocator(TBusMessageAutoPtr req, TBusClientSession* session) {
- CheckThreadCurrentJob();
- SetJob(req.Get(), Runner);
- Pending.push_back(TJobState(nullptr, MESSAGE_DONT_ASK, req.Release(), session, nullptr, 0, nullptr, true));
- }
- ///////////////////////////////////////////////////////////////
- /// send reply to the starter message
- void TBusJob::SendReply(TBusMessageAutoPtr reply) {
- CheckThreadCurrentJob();
- Y_ABORT_UNLESS(!ReplySent, "cannot call SendReply twice");
- ReplySent = true;
- if (!OnMessageContext)
- return;
- EMessageStatus ok = OnMessageContext.SendReplyMove(reply);
- if (ok != MESSAGE_OK) {
- // TODO: count errors
- }
- }
- /// set the flag to terminate job at the earliest convenience
- void TBusJob::Cancel(EMessageStatus status) {
- CheckThreadCurrentJob();
- Status = status;
- }
- void TBusJob::ClearState(TJobState& call) {
- TJobStateVec::iterator it;
- for (it = Finished.begin(); it != Finished.end(); ++it) {
- TJobState& state = *it;
- if (&call == &state) {
- ::ClearState(&call);
- Finished.erase(it);
- return;
- }
- }
- Y_ASSERT(0);
- }
- void TBusJob::ClearAllMessageStates() {
- ClearJobStateVector(&Finished);
- ClearJobStateVector(&Pending);
- }
- void TBusJob::Sleep(int milliSeconds) {
- CheckThreadCurrentJob();
- Y_ABORT_UNLESS(Pending.empty(), "sleep is not allowed when there are pending job");
- Y_ABORT_UNLESS(SleepUntil == 0, "must not override sleep");
- SleepUntil = Now() + milliSeconds;
- }
- TString TBusJob::GetStatus(unsigned flags) {
- TString strReturn;
- strReturn += Sprintf(" job=%016" PRIx64 " type=%d sent=%d pending=%d (%d) %s\n",
- Message->GetHeader()->Id,
- (int)Message->GetHeader()->Type,
- (int)(Now() - Message->GetHeader()->SendTime) / 1000,
- (int)Pending.size(),
- (int)Finished.size(),
- Status != MESSAGE_OK ? ToString(Status).data() : "");
- TJobStateVec::iterator it;
- for (it = Pending.begin(); it != Pending.end(); ++it) {
- TJobState& call = *it;
- strReturn += call.GetStatus(flags);
- }
- return strReturn;
- }
- TString TJobState::GetStatus(unsigned flags) {
- Y_UNUSED(flags);
- TString strReturn;
- strReturn += Sprintf(" pending=%016" PRIx64 " type=%d (%s) sent=%d %s\n",
- Message->GetHeader()->Id,
- (int)Message->GetHeader()->Type,
- Session->GetProto()->GetService(),
- (int)(Now() - Message->GetHeader()->SendTime) / 1000,
- ToString(Status).data());
- return strReturn;
- }
- //////////////////////////////////////////////////////////////////////
- void TBusModuleImpl::CancelJob(TBusJob* job, EMessageStatus status) {
- TWhatThreadDoesAcquireGuard<TMutex> G(Lock, "modules: acquiring lock for CancelJob");
- if (job) {
- job->Cancel(status);
- }
- }
- TString TBusModuleImpl::GetStatus(unsigned flags) {
- Y_UNUSED(flags);
- TWhatThreadDoesAcquireGuard<TMutex> G(Lock, "modules: acquiring lock for GetStatus");
- TString strReturn = Sprintf("JobsInFlight=%d\n", (int)Jobs.size());
- for (auto job : Jobs) {
- //strReturn += job->Job->GetStatus(flags);
- Y_UNUSED(job);
- strReturn += "TODO\n";
- }
- return strReturn;
- }
- TBusModuleConfig::TBusModuleConfig()
- : StarterMaxInFlight(1000)
- {
- }
- TBusModuleConfig::TSecret::TSecret()
- : SchedulePeriod(TDuration::Seconds(1))
- {
- }
- TBusModule::TBusModule(const char* name)
- : Impl(new TBusModuleImpl(this, name))
- {
- }
- TBusModule::~TBusModule() {
- }
- const char* TBusModule::GetName() const {
- return Impl->Name;
- }
- void TBusModule::SetConfig(const TBusModuleConfig& config) {
- Impl->ModuleConfig = config;
- }
- bool TBusModule::StartInput() {
- Y_ABORT_UNLESS(Impl->State == TBusModuleImpl::CREATED, "state check");
- Y_ABORT_UNLESS(!!Impl->Queue, "state check");
- Impl->State = TBusModuleImpl::RUNNING;
- Y_ASSERT(!Impl->ExternalSession);
- TBusServerSessionPtr extSession = CreateExtSession(*Impl->Queue);
- if (extSession != nullptr) {
- Impl->ExternalSession = extSession;
- }
- return true;
- }
- bool TBusModule::Shutdown() {
- Impl->Shutdown();
- return true;
- }
- TBusJob* TBusModule::CreateJobInstance(TBusMessage* message) {
- TBusJob* job = new TBusJob(this, message);
- return job;
- }
- /**
- Example for external session creation:
- TBusSession* TMyModule::CreateExtSession(TBusMessageQueue& queue) {
- TBusSession* session = CreateDefaultDestination(queue, &ExternalProto, ExternalConfig);
- session->RegisterService(hostname, begin, end);
- return session;
- */
- bool TBusModule::CreatePrivateSessions(TBusMessageQueue* queue) {
- Impl->Queue = queue;
- return true;
- }
- int TBusModule::GetModuleSessionInFlight() const {
- return Impl->Size();
- }
- TIntrusivePtr<TBusModuleInternal> TBusModule::GetInternal() {
- return Impl.Get();
- }
- TBusServerSessionPtr TBusModule::CreateDefaultDestination(
- TBusMessageQueue& queue, TBusProtocol* proto, const TBusServerSessionConfig& config, const TString& name) {
- TBusServerSessionConfig patchedConfig = config;
- patchedConfig.ExecuteOnMessageInWorkerPool = false;
- if (!patchedConfig.Name) {
- patchedConfig.Name = name;
- }
- if (!patchedConfig.Name) {
- patchedConfig.Name = Impl->Name;
- }
- TBusServerSessionPtr session =
- TBusServerSession::Create(proto, Impl->ModuleServerHandler.Get(), patchedConfig, &queue);
- Impl->ServerSessions.push_back(session);
- return session;
- }
- TBusClientSessionPtr TBusModule::CreateDefaultSource(
- TBusMessageQueue& queue, TBusProtocol* proto, const TBusClientSessionConfig& config, const TString& name) {
- TBusClientSessionConfig patchedConfig = config;
- patchedConfig.ExecuteOnReplyInWorkerPool = false;
- if (!patchedConfig.Name) {
- patchedConfig.Name = name;
- }
- if (!patchedConfig.Name) {
- patchedConfig.Name = Impl->Name;
- }
- TBusClientSessionPtr session =
- TBusClientSession::Create(proto, Impl->ModuleClientHandler.Get(), patchedConfig, &queue);
- Impl->ClientSessions.push_back(session);
- return session;
- }
- TBusStarter* TBusModule::CreateDefaultStarter(TBusMessageQueue&, const TBusSessionConfig& config) {
- TBusStarter* session = new TBusStarter(this, config);
- Impl->Starters.push_back(session);
- return session;
- }
- void TBusModule::OnClientConnectionEvent(const TClientConnectionEvent& event) {
- Y_UNUSED(event);
- }
- TString TBusModule::GetStatus(unsigned flags) {
- TString strReturn = Sprintf("%s\n", Impl->Name);
- strReturn += Impl->GetStatus(flags);
- return strReturn;
- }
- }
- void TBusModuleImpl::AddJob(TJobRunner* jobRunner) {
- TWhatThreadDoesAcquireGuard<TMutex> G(Lock, "modules: acquiring lock for AddJob");
- Jobs.push_back(jobRunner);
- jobRunner->JobStorageIterator = Jobs.end();
- --jobRunner->JobStorageIterator;
- }
- void TBusModuleImpl::DestroyJob(TJobRunner* job) {
- Y_ASSERT(job->JobStorageIterator != TList<TJobRunner*>::iterator());
- {
- TWhatThreadDoesAcquireGuard<TMutex> G(Lock, "modules: acquiring lock for DestroyJob");
- int jobCount = AtomicDecrement(JobCount);
- Y_ABORT_UNLESS(jobCount >= 0, "decremented too much");
- Jobs.erase(job->JobStorageIterator);
- if (AtomicGet(State) == STOPPED) {
- if (jobCount == 0) {
- ShutdownCondVar.BroadCast();
- }
- }
- }
- job->JobStorageIterator = TList<TJobRunner*>::iterator();
- }
- void TBusModuleImpl::OnMessageReceived(TAutoPtr<TBusMessage> msg0, TOnMessageContext& context) {
- TBusMessage* msg = !!msg0 ? msg0.Get() : context.GetMessage();
- Y_ABORT_UNLESS(!!msg);
- THolder<TJobRunner> jobRunner(new TJobRunner(Module->CreateJobInstance(msg)));
- jobRunner->Job->MessageHolder.Reset(msg0.Release());
- jobRunner->Job->OnMessageContext.Swap(context);
- SetJob(jobRunner->Job->Message, jobRunner.Get());
- AtomicIncrement(JobCount);
- AddJob(jobRunner.Get());
- jobRunner.Release()->Schedule();
- }
- void TBusModuleImpl::Shutdown() {
- if (AtomicGet(State) != TBusModuleImpl::RUNNING) {
- AtomicSet(State, TBusModuleImpl::STOPPED);
- return;
- }
- AtomicSet(State, TBusModuleImpl::STOPPED);
- for (auto& clientSession : ClientSessions) {
- clientSession->Shutdown();
- }
- for (auto& serverSession : ServerSessions) {
- serverSession->Shutdown();
- }
- for (size_t starter = 0; starter < Starters.size(); ++starter) {
- Starters[starter]->Shutdown();
- }
- {
- TWhatThreadDoesAcquireGuard<TMutex> guard(Lock, "modules: acquiring lock for Shutdown");
- for (auto& Job : Jobs) {
- Job->Schedule();
- }
- while (!Jobs.empty()) {
- ShutdownCondVar.WaitI(Lock);
- }
- }
- }
- EMessageStatus TBusModule::StartJob(TAutoPtr<TBusMessage> message) {
- Y_ABORT_UNLESS(Impl->State == TBusModuleImpl::RUNNING);
- Y_ABORT_UNLESS(!!Impl->Queue);
- if ((unsigned)AtomicGet(Impl->JobCount) >= Impl->ModuleConfig.StarterMaxInFlight) {
- return MESSAGE_BUSY;
- }
- TOnMessageContext dummy;
- Impl->OnMessageReceived(message.Release(), dummy);
- return MESSAGE_OK;
- }
- void TModuleServerHandler::OnMessage(TOnMessageContext& msg) {
- Module->OnMessageReceived(nullptr, msg);
- }
- void TModuleClientHandler::OnReply(TAutoPtr<TBusMessage> req, TAutoPtr<TBusMessage> resp) {
- TJobRunner* job = GetJob(req.Get());
- Y_ASSERT(job);
- Y_ASSERT(job->Job->Message != req.Get());
- job->EnqueueAndSchedule(TJobResponseMessage(req.Release(), resp.Release(), MESSAGE_OK));
- job->UnRef();
- }
- void TModuleClientHandler::OnMessageSentOneWay(TAutoPtr<TBusMessage> req) {
- TJobRunner* job = GetJob(req.Get());
- Y_ASSERT(job);
- Y_ASSERT(job->Job->Message != req.Get());
- job->EnqueueAndSchedule(TJobResponseMessage(req.Release(), nullptr, MESSAGE_OK));
- job->UnRef();
- }
- void TModuleClientHandler::OnError(TAutoPtr<TBusMessage> msg, EMessageStatus status) {
- TJobRunner* job = GetJob(msg.Get());
- if (job) {
- Y_ASSERT(job->Job->Message != msg.Get());
- job->EnqueueAndSchedule(TJobResponseMessage(msg.Release(), nullptr, status));
- job->UnRef();
- }
- }
- void TModuleClientHandler::OnClientConnectionEvent(const TClientConnectionEvent& event) {
- Module->OnClientConnectionEvent(event);
- }
|