|
- #include "multiclient.h"
- #include "utils.h"
- #include <library/cpp/containers/intrusive_rb_tree/rb_tree.h>
- #include <atomic>
- namespace {
- using namespace NNeh;
- struct TCompareDeadline {
- template <class T>
- static inline bool Compare(const T& l, const T& r) noexcept {
- return l.Deadline() < r.Deadline() || (l.Deadline() == r.Deadline() && &l < &r);
- }
- };
- class TMultiClient: public IMultiClient, public TThrRefBase {
- class TRequestSupervisor: public TRbTreeItem<TRequestSupervisor, TCompareDeadline>, public IOnRecv, public TThrRefBase, public TNonCopyable {
- private:
- TRequestSupervisor() {
- } //disable
- public:
- inline TRequestSupervisor(const TRequest& request, TMultiClient* mc) noexcept
- : MC_(mc)
- , Request_(request)
- , Maked_(0)
- , FinishOnMakeRequest_(0)
- , Handled_(0)
- , Dequeued_(false)
- {
- }
- inline TInstant Deadline() const noexcept {
- return Request_.Deadline;
- }
- //not thread safe (can be called at some time from TMultiClient::Request() and TRequestSupervisor::OnNotify())
- void OnMakeRequest(THandleRef h) noexcept {
- //request can be mark as maked only once, so only one/first call set handle
- if (AtomicCas(&Maked_, 1, 0)) {
- H_.Swap(h);
- //[paranoid mode on] make sure handle be initiated before return
- AtomicSet(FinishOnMakeRequest_, 1);
- } else {
- while (!AtomicGet(FinishOnMakeRequest_)) {
- SpinLockPause();
- }
- //[paranoid mode off]
- }
- }
- void FillEvent(TEvent& ev) noexcept {
- ev.Hndl = H_;
- FillEventUserData(ev);
- }
- void FillEventUserData(TEvent& ev) noexcept {
- ev.UserData = Request_.UserData;
- }
- void ResetRequest() noexcept { //destroy keepaliving cross-ref TRequestSupervisor<->THandle
- H_.Drop();
- }
- //method OnProcessRequest() & OnProcessResponse() executed from Wait() context (thread)
- void OnEndProcessRequest() {
- Dequeued_ = true;
- if (Y_UNLIKELY(IsHandled())) {
- ResetRequest(); //race - response already handled before processing request from queue
- } else {
- MC_->RegisterRequest(this);
- }
- }
- void OnEndProcessResponse() {
- if (Y_LIKELY(Dequeued_)) {
- UnLink();
- ResetRequest();
- } //else request yet not dequeued/registered, so we not need unlink request
- //(when we later dequeue request OnEndProcessRequest()...IsHandled() return true and we reset request)
- }
- //IOnRecv interface
- void OnNotify(THandle& h) override {
- if (Y_LIKELY(MarkAsHandled())) {
- THandleRef hr(&h);
- OnMakeRequest(hr); //fix race with receiving response before return control from NNeh::Request()
- MC_->ScheduleResponse(this, hr);
- }
- }
- void OnRecv(THandle&) noexcept override {
- UnRef();
- }
- void OnEnd() noexcept override {
- UnRef();
- }
- //
- //request can be handled only once, so only one/first call MarkAsHandled() return true
- bool MarkAsHandled() noexcept {
- return AtomicCas(&Handled_, 1, 0);
- }
- bool IsHandled() const noexcept {
- return AtomicGet(Handled_);
- }
- private:
- TIntrusivePtr<TMultiClient> MC_;
- TRequest Request_;
- THandleRef H_;
- TAtomic Maked_;
- TAtomic FinishOnMakeRequest_;
- TAtomic Handled_;
- bool Dequeued_;
- };
- typedef TRbTree<TRequestSupervisor, TCompareDeadline> TRequestsSupervisors;
- typedef TIntrusivePtr<TRequestSupervisor> TRequestSupervisorRef;
- public:
- TMultiClient()
- : Interrupt_(false)
- , NearDeadline_(TInstant::Max().GetValue())
- , E_(::TSystemEvent::rAuto)
- , Shutdown_(false)
- {
- }
- struct TResetRequest {
- inline void operator()(TRequestSupervisor& rs) const noexcept {
- rs.ResetRequest();
- }
- };
- void Shutdown() {
- //reset THandleRef's for all exist supervisors and jobs queue (+prevent creating new)
- //- so we break crossref-chain, which prevent destroy this object THande->TRequestSupervisor->TMultiClient)
- Shutdown_ = true;
- RS_.ForEachNoOrder(TResetRequest());
- RS_.Clear();
- CleanQueue();
- }
- private:
- class IJob {
- public:
- virtual ~IJob() {
- }
- virtual bool Process(TEvent&) = 0;
- virtual void Cancel() = 0;
- };
- typedef TAutoPtr<IJob> TJobPtr;
- class TNewRequest: public IJob {
- public:
- TNewRequest(TRequestSupervisorRef& rs)
- : RS_(rs)
- {
- }
- private:
- bool Process(TEvent&) override {
- RS_->OnEndProcessRequest();
- return false;
- }
- void Cancel() override {
- RS_->ResetRequest();
- }
- TRequestSupervisorRef RS_;
- };
- class TNewResponse: public IJob {
- public:
- TNewResponse(TRequestSupervisor* rs, THandleRef& h) noexcept
- : RS_(rs)
- , H_(h)
- {
- }
- private:
- bool Process(TEvent& ev) override {
- ev.Type = TEvent::Response;
- ev.Hndl = H_;
- RS_->FillEventUserData(ev);
- RS_->OnEndProcessResponse();
- return true;
- }
- void Cancel() override {
- RS_->ResetRequest();
- }
- TRequestSupervisorRef RS_;
- THandleRef H_;
- };
- public:
- THandleRef Request(const TRequest& request) override {
- TIntrusivePtr<TRequestSupervisor> rs(new TRequestSupervisor(request, this));
- THandleRef h;
- try {
- rs->Ref();
- h = NNeh::Request(request.Msg, rs.Get());
- //accurately handle race when processing new request event
- //(we already can receive response (call OnNotify) before we schedule info about new request here)
- } catch (...) {
- rs->UnRef();
- throw;
- }
- rs->OnMakeRequest(h);
- ScheduleRequest(rs, h, request.Deadline);
- return h;
- }
- bool Wait(TEvent& ev, const TInstant deadline_ = TInstant::Max()) override {
- while (!Interrupt_) {
- TInstant deadline = deadline_;
- const TInstant now = TInstant::Now();
- if (deadline != TInstant::Max() && now >= deadline) {
- break;
- }
- { //process jobs queue (requests/responses info)
- TAutoPtr<IJob> j;
- while (JQ_.Dequeue(&j)) {
- if (j->Process(ev)) {
- return true;
- }
- }
- }
- if (!RS_.Empty()) {
- TRequestSupervisor* nearRS = &*RS_.Begin();
- if (nearRS->Deadline() <= now) {
- if (!nearRS->MarkAsHandled()) {
- //race with notify, - now in queue must exist response job for this request
- continue;
- }
- ev.Type = TEvent::Timeout;
- nearRS->FillEvent(ev);
- nearRS->ResetRequest();
- nearRS->UnLink();
- return true;
- }
- deadline = Min(nearRS->Deadline(), deadline);
- }
- if (SetNearDeadline(deadline)) {
- continue; //update deadline to more far time, so need re-check queue for avoiding race
- }
- E_.WaitD(deadline);
- }
- Interrupt_ = false;
- return false;
- }
- void Interrupt() override {
- Interrupt_ = true;
- Signal();
- }
- size_t QueueSize() override {
- return JQ_.Size();
- }
- private:
- void Signal() {
- //TODO:try optimize - hack with skipping signaling if not have waiters (reduce mutex usage)
- E_.Signal();
- }
- void ScheduleRequest(TIntrusivePtr<TRequestSupervisor>& rs, const THandleRef& h, const TInstant& deadline) {
- TJobPtr j(new TNewRequest(rs));
- JQ_.Enqueue(j);
- if (!h->Signalled()) {
- if (deadline.GetValue() < GetNearDeadline_()) {
- Signal();
- }
- }
- }
- void ScheduleResponse(TRequestSupervisor* rs, THandleRef& h) {
- TJobPtr j(new TNewResponse(rs, h));
- JQ_.Enqueue(j);
- if (Y_UNLIKELY(Shutdown_)) {
- CleanQueue();
- } else {
- Signal();
- }
- }
- //return true, if deadline re-installed to more late time
- bool SetNearDeadline(const TInstant& deadline) {
- bool deadlineMovedFurther = deadline.GetValue() > GetNearDeadline_();
- SetNearDeadline_(deadline.GetValue());
- return deadlineMovedFurther;
- }
- //used only from Wait()
- void RegisterRequest(TRequestSupervisor* rs) {
- if (rs->Deadline() != TInstant::Max()) {
- RS_.Insert(rs);
- } else {
- rs->ResetRequest(); //prevent blocking destruction 'endless' requests
- }
- }
- void CleanQueue() {
- TAutoPtr<IJob> j;
- while (JQ_.Dequeue(&j)) {
- j->Cancel();
- }
- }
- private:
- void SetNearDeadline_(const TInstant::TValue& v) noexcept {
- TGuard<TAdaptiveLock> g(NDLock_);
- NearDeadline_.store(v, std::memory_order_release);
- }
- TInstant::TValue GetNearDeadline_() const noexcept {
- TGuard<TAdaptiveLock> g(NDLock_);
- return NearDeadline_.load(std::memory_order_acquire);
- }
- NNeh::TAutoLockFreeQueue<IJob> JQ_;
- TAtomicBool Interrupt_;
- TRequestsSupervisors RS_;
- TAdaptiveLock NDLock_;
- std::atomic<TInstant::TValue> NearDeadline_;
- ::TSystemEvent E_;
- TAtomicBool Shutdown_;
- };
- class TMultiClientAutoShutdown: public IMultiClient {
- public:
- TMultiClientAutoShutdown()
- : MC_(new TMultiClient())
- {
- }
- ~TMultiClientAutoShutdown() override {
- MC_->Shutdown();
- }
- size_t QueueSize() override {
- return MC_->QueueSize();
- }
- private:
- THandleRef Request(const TRequest& req) override {
- return MC_->Request(req);
- }
- bool Wait(TEvent& ev, TInstant deadline = TInstant::Max()) override {
- return MC_->Wait(ev, deadline);
- }
- void Interrupt() override {
- return MC_->Interrupt();
- }
- private:
- TIntrusivePtr<TMultiClient> MC_;
- };
- }
- TMultiClientPtr NNeh::CreateMultiClient() {
- return new TMultiClientAutoShutdown();
- }
|