#include "multiclient.h" #include "utils.h" #include #include namespace { using namespace NNeh; struct TCompareDeadline { template static inline bool Compare(const T& l, const T& r) noexcept { return l.Deadline() < r.Deadline() || (l.Deadline() == r.Deadline() && &l < &r); } }; class TMultiClient: public IMultiClient, public TThrRefBase { class TRequestSupervisor: public TRbTreeItem, public IOnRecv, public TThrRefBase, public TNonCopyable { private: TRequestSupervisor() { } //disable public: inline TRequestSupervisor(const TRequest& request, TMultiClient* mc) noexcept : MC_(mc) , Request_(request) , Maked_(0) , FinishOnMakeRequest_(0) , Handled_(0) , Dequeued_(false) { } inline TInstant Deadline() const noexcept { return Request_.Deadline; } //not thread safe (can be called at some time from TMultiClient::Request() and TRequestSupervisor::OnNotify()) void OnMakeRequest(THandleRef h) noexcept { //request can be mark as maked only once, so only one/first call set handle if (AtomicCas(&Maked_, 1, 0)) { H_.Swap(h); //[paranoid mode on] make sure handle be initiated before return AtomicSet(FinishOnMakeRequest_, 1); } else { while (!AtomicGet(FinishOnMakeRequest_)) { SpinLockPause(); } //[paranoid mode off] } } void FillEvent(TEvent& ev) noexcept { ev.Hndl = H_; FillEventUserData(ev); } void FillEventUserData(TEvent& ev) noexcept { ev.UserData = Request_.UserData; } void ResetRequest() noexcept { //destroy keepaliving cross-ref TRequestSupervisor<->THandle H_.Drop(); } //method OnProcessRequest() & OnProcessResponse() executed from Wait() context (thread) void OnEndProcessRequest() { Dequeued_ = true; if (Y_UNLIKELY(IsHandled())) { ResetRequest(); //race - response already handled before processing request from queue } else { MC_->RegisterRequest(this); } } void OnEndProcessResponse() { if (Y_LIKELY(Dequeued_)) { UnLink(); ResetRequest(); } //else request yet not dequeued/registered, so we not need unlink request //(when we later dequeue request OnEndProcessRequest()...IsHandled() return true and we reset request) } //IOnRecv interface void OnNotify(THandle& h) override { if (Y_LIKELY(MarkAsHandled())) { THandleRef hr(&h); OnMakeRequest(hr); //fix race with receiving response before return control from NNeh::Request() MC_->ScheduleResponse(this, hr); } } void OnRecv(THandle&) noexcept override { UnRef(); } void OnEnd() noexcept override { UnRef(); } // //request can be handled only once, so only one/first call MarkAsHandled() return true bool MarkAsHandled() noexcept { return AtomicCas(&Handled_, 1, 0); } bool IsHandled() const noexcept { return AtomicGet(Handled_); } private: TIntrusivePtr MC_; TRequest Request_; THandleRef H_; TAtomic Maked_; TAtomic FinishOnMakeRequest_; TAtomic Handled_; bool Dequeued_; }; typedef TRbTree TRequestsSupervisors; typedef TIntrusivePtr TRequestSupervisorRef; public: TMultiClient() : Interrupt_(false) , NearDeadline_(TInstant::Max().GetValue()) , E_(::TSystemEvent::rAuto) , Shutdown_(false) { } struct TResetRequest { inline void operator()(TRequestSupervisor& rs) const noexcept { rs.ResetRequest(); } }; void Shutdown() { //reset THandleRef's for all exist supervisors and jobs queue (+prevent creating new) //- so we break crossref-chain, which prevent destroy this object THande->TRequestSupervisor->TMultiClient) Shutdown_ = true; RS_.ForEachNoOrder(TResetRequest()); RS_.Clear(); CleanQueue(); } private: class IJob { public: virtual ~IJob() { } virtual bool Process(TEvent&) = 0; virtual void Cancel() = 0; }; typedef TAutoPtr TJobPtr; class TNewRequest: public IJob { public: TNewRequest(TRequestSupervisorRef& rs) : RS_(rs) { } private: bool Process(TEvent&) override { RS_->OnEndProcessRequest(); return false; } void Cancel() override { RS_->ResetRequest(); } TRequestSupervisorRef RS_; }; class TNewResponse: public IJob { public: TNewResponse(TRequestSupervisor* rs, THandleRef& h) noexcept : RS_(rs) , H_(h) { } private: bool Process(TEvent& ev) override { ev.Type = TEvent::Response; ev.Hndl = H_; RS_->FillEventUserData(ev); RS_->OnEndProcessResponse(); return true; } void Cancel() override { RS_->ResetRequest(); } TRequestSupervisorRef RS_; THandleRef H_; }; public: THandleRef Request(const TRequest& request) override { TIntrusivePtr rs(new TRequestSupervisor(request, this)); THandleRef h; try { rs->Ref(); h = NNeh::Request(request.Msg, rs.Get()); //accurately handle race when processing new request event //(we already can receive response (call OnNotify) before we schedule info about new request here) } catch (...) { rs->UnRef(); throw; } rs->OnMakeRequest(h); ScheduleRequest(rs, h, request.Deadline); return h; } bool Wait(TEvent& ev, const TInstant deadline_ = TInstant::Max()) override { while (!Interrupt_) { TInstant deadline = deadline_; const TInstant now = TInstant::Now(); if (deadline != TInstant::Max() && now >= deadline) { break; } { //process jobs queue (requests/responses info) TAutoPtr j; while (JQ_.Dequeue(&j)) { if (j->Process(ev)) { return true; } } } if (!RS_.Empty()) { TRequestSupervisor* nearRS = &*RS_.Begin(); if (nearRS->Deadline() <= now) { if (!nearRS->MarkAsHandled()) { //race with notify, - now in queue must exist response job for this request continue; } ev.Type = TEvent::Timeout; nearRS->FillEvent(ev); nearRS->ResetRequest(); nearRS->UnLink(); return true; } deadline = Min(nearRS->Deadline(), deadline); } if (SetNearDeadline(deadline)) { continue; //update deadline to more far time, so need re-check queue for avoiding race } E_.WaitD(deadline); } Interrupt_ = false; return false; } void Interrupt() override { Interrupt_ = true; Signal(); } size_t QueueSize() override { return JQ_.Size(); } private: void Signal() { //TODO:try optimize - hack with skipping signaling if not have waiters (reduce mutex usage) E_.Signal(); } void ScheduleRequest(TIntrusivePtr& rs, const THandleRef& h, const TInstant& deadline) { TJobPtr j(new TNewRequest(rs)); JQ_.Enqueue(j); if (!h->Signalled()) { if (deadline.GetValue() < GetNearDeadline_()) { Signal(); } } } void ScheduleResponse(TRequestSupervisor* rs, THandleRef& h) { TJobPtr j(new TNewResponse(rs, h)); JQ_.Enqueue(j); if (Y_UNLIKELY(Shutdown_)) { CleanQueue(); } else { Signal(); } } //return true, if deadline re-installed to more late time bool SetNearDeadline(const TInstant& deadline) { bool deadlineMovedFurther = deadline.GetValue() > GetNearDeadline_(); SetNearDeadline_(deadline.GetValue()); return deadlineMovedFurther; } //used only from Wait() void RegisterRequest(TRequestSupervisor* rs) { if (rs->Deadline() != TInstant::Max()) { RS_.Insert(rs); } else { rs->ResetRequest(); //prevent blocking destruction 'endless' requests } } void CleanQueue() { TAutoPtr j; while (JQ_.Dequeue(&j)) { j->Cancel(); } } private: void SetNearDeadline_(const TInstant::TValue& v) noexcept { TGuard g(NDLock_); NearDeadline_.store(v, std::memory_order_release); } TInstant::TValue GetNearDeadline_() const noexcept { TGuard g(NDLock_); return NearDeadline_.load(std::memory_order_acquire); } NNeh::TAutoLockFreeQueue JQ_; TAtomicBool Interrupt_; TRequestsSupervisors RS_; TAdaptiveLock NDLock_; std::atomic NearDeadline_; ::TSystemEvent E_; TAtomicBool Shutdown_; }; class TMultiClientAutoShutdown: public IMultiClient { public: TMultiClientAutoShutdown() : MC_(new TMultiClient()) { } ~TMultiClientAutoShutdown() override { MC_->Shutdown(); } size_t QueueSize() override { return MC_->QueueSize(); } private: THandleRef Request(const TRequest& req) override { return MC_->Request(req); } bool Wait(TEvent& ev, TInstant deadline = TInstant::Max()) override { return MC_->Wait(ev, deadline); } void Interrupt() override { return MC_->Interrupt(); } private: TIntrusivePtr MC_; }; } TMultiClientPtr NNeh::CreateMultiClient() { return new TMultiClientAutoShutdown(); }