#pragma once #include "asio.h" #include "poll_interrupter.h" #include #include #include #include #include #include #include #include #ifdef DEBUG_ASIO #define DBGOUT(args) Cout << args << Endl; #else #define DBGOUT(args) #endif namespace NAsio { #if defined(_arm_) template struct TLockFreeSequence { Y_NO_INLINE T& Get(size_t n) { with_lock (M) { return H[n]; } } TMutex M; THashMap H; }; #else //TODO: copypaste from neh, - need fix template class TLockFreeSequence { public: inline TLockFreeSequence() { memset((void*)T_, 0, sizeof(T_)); } inline ~TLockFreeSequence() { for (size_t i = 0; i < Y_ARRAY_SIZE(T_); ++i) { delete[] T_[i]; } } inline T& Get(size_t n) { const size_t i = GetValueBitCount(n + 1) - 1; return GetList(i)[n + 1 - (((size_t)1) << i)]; } private: inline T* GetList(size_t n) { T* volatile* t = T_ + n; while (!*t) { TArrayHolder nt(new T[((size_t)1) << n]); if (AtomicCas(t, nt.Get(), nullptr)) { return nt.Release(); } } return *t; } private: T* volatile T_[sizeof(size_t) * 8]; }; #endif struct TOperationCompare { template static inline bool Compare(const T& l, const T& r) noexcept { return l.DeadLine() < r.DeadLine() || (l.DeadLine() == r.DeadLine() && &l < &r); } }; //async operation, execute in contex TIOService()::Run() thread-executor //usualy used for call functors/callbacks class TOperation: public TRbTreeItem, public IHandlingContext { public: TOperation(TInstant deadline = TInstant::Max()) : D_(deadline) , Speculative_(false) , RequiredRepeatExecution_(false) , ND_(deadline) { } //register this operation in svc.impl. virtual void AddOp(TIOService::TImpl&) = 0; //return false, if operation not completed virtual bool Execute(int errorCode = 0) = 0; void ContinueUseHandler(TDeadline deadline) override { RequiredRepeatExecution_ = true; ND_ = deadline; } virtual void Finalize() = 0; inline TInstant Deadline() const noexcept { return D_; } inline TInstant DeadLine() const noexcept { return D_; } inline bool Speculative() const noexcept { return Speculative_; } inline bool IsRequiredRepeat() const noexcept { return RequiredRepeatExecution_; } inline void PrepareReExecution() noexcept { RequiredRepeatExecution_ = false; D_ = ND_; } protected: TInstant D_; bool Speculative_; //if true, operation will be runned immediately after dequeue (even without wating any event) //as sample used for optimisation writing, - obviously in buffers exist space for write bool RequiredRepeatExecution_; //set to true, if required re-exec operation TInstant ND_; //new deadline (for re-exec operation) }; typedef TAutoPtr TOperationPtr; class TNoneOperation: public TOperation { public: TNoneOperation(TInstant deadline = TInstant::Max()) : TOperation(deadline) { } void AddOp(TIOService::TImpl&) override { Y_ASSERT(0); } void Finalize() override { } }; class TPollFdEventHandler; //descriptor use operation class TFdOperation: public TOperation { public: enum TPollType { PollRead, PollWrite }; TFdOperation(SOCKET fd, TPollType pt, TInstant deadline = TInstant::Max()) : TOperation(deadline) , Fd_(fd) , PT_(pt) , PH_(nullptr) { Y_ASSERT(Fd() != INVALID_SOCKET); } inline SOCKET Fd() const noexcept { return Fd_; } inline bool IsPollRead() const noexcept { return PT_ == PollRead; } void AddOp(TIOService::TImpl& srv) override; void Finalize() override; protected: SOCKET Fd_; TPollType PT_; public: TAutoPtr* PH_; }; typedef TAutoPtr TFdOperationPtr; class TPollFdEventHandler { public: TPollFdEventHandler(SOCKET fd, TIOService::TImpl& srv) : Fd_(fd) , HandledEvents_(0) , Srv_(srv) { } virtual ~TPollFdEventHandler() { Y_ASSERT(ReadOperations_.size() == 0); Y_ASSERT(WriteOperations_.size() == 0); } inline void AddReadOp(TFdOperationPtr op) { ReadOperations_.push_back(op); } inline void AddWriteOp(TFdOperationPtr op) { WriteOperations_.push_back(op); } virtual void OnFdEvent(int status, ui16 filter) { DBGOUT("PollEvent(fd=" << Fd_ << ", " << status << ", " << filter << ")"); if (status) { ExecuteOperations(ReadOperations_, status); ExecuteOperations(WriteOperations_, status); } else { if (filter & CONT_POLL_READ) { ExecuteOperations(ReadOperations_, status); } if (filter & CONT_POLL_WRITE) { ExecuteOperations(WriteOperations_, status); } } } typedef TVector TFdOperations; void ExecuteOperations(TFdOperations& oprs, int errorCode); //return true if filter handled events changed and require re-configure events poller virtual bool FixHandledEvents() noexcept { DBGOUT("TPollFdEventHandler::FixHandledEvents()"); ui16 filter = 0; if (WriteOperations_.size()) { filter |= CONT_POLL_WRITE; } if (ReadOperations_.size()) { filter |= CONT_POLL_READ; } if (Y_LIKELY(HandledEvents_ == filter)) { return false; } HandledEvents_ = filter; return true; } inline bool FinishOp(TFdOperations& oprs, TFdOperation* op) noexcept { for (TFdOperations::iterator it = oprs.begin(); it != oprs.end(); ++it) { if (it->Get() == op) { FinishedOperations_.push_back(*it); oprs.erase(it); return true; } } return false; } void DelOp(TFdOperation* op); inline SOCKET Fd() const noexcept { return Fd_; } inline ui16 HandledEvents() const noexcept { return HandledEvents_; } inline void AddHandlingEvent(ui16 ev) noexcept { HandledEvents_ |= ev; } inline void DestroyFinishedOperations() { FinishedOperations_.clear(); } TIOService::TImpl& GetServiceImpl() const noexcept { return Srv_; } protected: SOCKET Fd_; ui16 HandledEvents_; TIOService::TImpl& Srv_; private: TVector ReadOperations_; TVector WriteOperations_; // we can't immediatly destroy finished operations, this can cause closing used socket descriptor Fd_ // (on cascade deletion operation object-handler), but later we use Fd_ for modify handled events at poller, // so we collect here finished operations and destroy it only after update poller, - // call FixHandledEvents(TPollFdEventHandlerPtr&) TVector FinishedOperations_; }; //additional descriptor for poller, used for interrupt current poll wait class TInterrupterHandler: public TPollFdEventHandler { public: TInterrupterHandler(TIOService::TImpl& srv, TPollInterrupter& pi) : TPollFdEventHandler(pi.Fd(), srv) , PI_(pi) { HandledEvents_ = CONT_POLL_READ; } ~TInterrupterHandler() override { DBGOUT("~TInterrupterHandler"); } void OnFdEvent(int status, ui16 filter) override; bool FixHandledEvents() noexcept override { DBGOUT("TInterrupterHandler::FixHandledEvents()"); return false; } private: TPollInterrupter& PI_; }; namespace { inline TAutoPtr CreatePoller() { try { #if defined(_linux_) return IPollerFace::Construct(TStringBuf("epoll")); #endif #if defined(_freebsd_) || defined(_darwin_) return IPollerFace::Construct(TStringBuf("kqueue")); #endif } catch (...) { Cdbg << CurrentExceptionMessage() << Endl; } return IPollerFace::Default(); } } //some equivalent TContExecutor class TIOService::TImpl: public TNonCopyable { public: typedef TAutoPtr TEvh; typedef TLockFreeSequence TEventHandlers; class TTimer { public: typedef THashSet TOperations; TTimer(TIOService::TImpl& srv) : Srv_(srv) { } virtual ~TTimer() { FailOperations(ECANCELED); } void AddOp(TOperation* op) { THolder tmp(op); Operations_.insert(op); Y_UNUSED(tmp.Release()); Srv_.RegisterOpDeadline(op); Srv_.IncTimersOp(); } void DelOp(TOperation* op) { TOperations::iterator it = Operations_.find(op); if (it != Operations_.end()) { Srv_.DecTimersOp(); delete op; Operations_.erase(it); } } inline void FailOperations(int ec) { for (auto operation : Operations_) { try { operation->Execute(ec); //throw ? } catch (...) { } Srv_.DecTimersOp(); delete operation; } Operations_.clear(); } TIOService::TImpl& GetIOServiceImpl() const noexcept { return Srv_; } protected: TIOService::TImpl& Srv_; THashSet Operations_; }; class TTimers: public THashSet { public: ~TTimers() { for (auto it : *this) { delete it; } } }; TImpl() : P_(CreatePoller()) , DeadlinesQueue_(*this) { } ~TImpl() { TOperationPtr op; while (OpQueue_.Dequeue(&op)) { //cancel all enqueued operations try { op->Execute(ECANCELED); } catch (...) { } op.Destroy(); } } //similar TContExecutor::Execute() or io_service::run() //process event loop (exit if none to do (no timers or event handlers)) void Run(); //enqueue functor fo call in Run() eventloop (thread safing) inline void Post(TCompletionHandler h) { class TFuncOperation: public TNoneOperation { public: TFuncOperation(TCompletionHandler completionHandler) : TNoneOperation() , H_(std::move(completionHandler)) { Speculative_ = true; } private: //return false, if operation not completed bool Execute(int errorCode) override { Y_UNUSED(errorCode); H_(); return true; } TCompletionHandler H_; }; ScheduleOp(new TFuncOperation(std::move(h))); } //cancel all current operations (handlers be called with errorCode == ECANCELED) void Abort(); bool HasAbort() { return AtomicGet(HasAbort_); } inline void ScheduleOp(TOperationPtr op) { //throw std::bad_alloc Y_ASSERT(!Aborted_); Y_ASSERT(!!op); OpQueue_.Enqueue(op); Interrupt(); } inline void Interrupt() noexcept { AtomicSet(NeedCheckOpQueue_, 1); if (AtomicAdd(IsWaiting_, 0) == 1) { I_.Interrupt(); } } inline void UpdateOpDeadline(TOperation* op) { TInstant oldDeadline = op->Deadline(); op->PrepareReExecution(); if (oldDeadline == op->Deadline()) { return; } if (oldDeadline != TInstant::Max()) { op->UnLink(); } if (op->Deadline() != TInstant::Max()) { DeadlinesQueue_.Register(op); } } void SyncRegisterTimer(TTimer* t) { Timers_.insert(t); } inline void SyncUnregisterAndDestroyTimer(TTimer* t) { Timers_.erase(t); delete t; } inline void IncTimersOp() noexcept { ++TimersOpCnt_; } inline void DecTimersOp() noexcept { --TimersOpCnt_; } inline void WorkStarted() { AtomicIncrement(OutstandingWork_); } inline void WorkFinished() { if (AtomicDecrement(OutstandingWork_) == 0) { Interrupt(); } } private: void ProcessAbort(); inline TEvh& EnsureGetEvh(SOCKET fd) { TEvh& evh = Evh_.Get(fd); if (!evh) { evh.Reset(new TPollFdEventHandler(fd, *this)); } return evh; } inline void OnTimeoutOp(TOperation* op) { DBGOUT("OnTimeoutOp"); try { op->Execute(ETIMEDOUT); //throw ? } catch (...) { op->Finalize(); throw; } if (op->IsRequiredRepeat()) { //operation not completed UpdateOpDeadline(op); } else { //destroy operation structure op->Finalize(); } } public: inline void FixHandledEvents(TEvh& evh) { if (!!evh) { if (evh->FixHandledEvents()) { if (!evh->HandledEvents()) { DelEventHandler(evh); evh.Destroy(); } else { ModEventHandler(evh); evh->DestroyFinishedOperations(); } } else { evh->DestroyFinishedOperations(); } } } private: inline TEvh& GetHandlerForOp(TFdOperation* op) { TEvh& evh = EnsureGetEvh(op->Fd()); op->PH_ = &evh; return evh; } void ProcessOpQueue() { if (!AtomicGet(NeedCheckOpQueue_)) { return; } AtomicSet(NeedCheckOpQueue_, 0); TOperationPtr op; while (OpQueue_.Dequeue(&op)) { if (op->Speculative()) { if (op->Execute(Y_UNLIKELY(Aborted_) ? ECANCELED : 0)) { op.Destroy(); continue; //operation completed } if (!op->IsRequiredRepeat()) { op->PrepareReExecution(); } } RegisterOpDeadline(op.Get()); op.Get()->AddOp(*this); // ... -> AddOp() Y_UNUSED(op.Release()); } } inline void RegisterOpDeadline(TOperation* op) { if (op->DeadLine() != TInstant::Max()) { DeadlinesQueue_.Register(op); } } public: inline void AddOp(TFdOperation* op) { DBGOUT("AddOp(" << op->Fd() << ")"); TEvh& evh = GetHandlerForOp(op); if (op->IsPollRead()) { evh->AddReadOp(op); EnsureEventHandled(evh, CONT_POLL_READ); } else { evh->AddWriteOp(op); EnsureEventHandled(evh, CONT_POLL_WRITE); } } private: inline void EnsureEventHandled(TEvh& evh, ui16 ev) { if (!evh->HandledEvents()) { evh->AddHandlingEvent(ev); AddEventHandler(evh); } else { if ((evh->HandledEvents() & ev) == 0) { evh->AddHandlingEvent(ev); ModEventHandler(evh); } } } public: //cancel all current operations for socket //method MUST be called from Run() thread-executor void CancelFdOp(SOCKET fd) { TEvh& evh = Evh_.Get(fd); if (!evh) { return; } OnFdEvent(evh, ECANCELED, CONT_POLL_READ | CONT_POLL_WRITE); } private: //helper for fixing handled events even in case exception struct TExceptionProofFixerHandledEvents { TExceptionProofFixerHandledEvents(TIOService::TImpl& srv, TEvh& iEvh) : Srv_(srv) , Evh_(iEvh) { } ~TExceptionProofFixerHandledEvents() { Srv_.FixHandledEvents(Evh_); } TIOService::TImpl& Srv_; TEvh& Evh_; }; inline void OnFdEvent(TEvh& evh, int status, ui16 filter) { TExceptionProofFixerHandledEvents fixer(*this, evh); Y_UNUSED(fixer); evh->OnFdEvent(status, filter); } inline void AddEventHandler(TEvh& evh) { if (evh->Fd() > MaxFd_) { MaxFd_ = evh->Fd(); } SetEventHandler(&evh, evh->Fd(), evh->HandledEvents()); ++FdEventHandlersCnt_; } inline void ModEventHandler(TEvh& evh) { SetEventHandler(&evh, evh->Fd(), evh->HandledEvents()); } inline void DelEventHandler(TEvh& evh) { SetEventHandler(&evh, evh->Fd(), 0); --FdEventHandlersCnt_; } inline void SetEventHandler(void* h, int fd, ui16 flags) { DBGOUT("SetEventHandler(" << fd << ", " << flags << ")"); P_->Set(h, fd, flags); } //exception safe call DelEventHandler struct TInterrupterKeeper { TInterrupterKeeper(TImpl& srv, TEvh& iEvh) : Srv_(srv) , Evh_(iEvh) { Srv_.AddEventHandler(Evh_); } ~TInterrupterKeeper() { Srv_.DelEventHandler(Evh_); } TImpl& Srv_; TEvh& Evh_; }; TAutoPtr P_; TPollInterrupter I_; TAtomic IsWaiting_ = 0; TAtomic NeedCheckOpQueue_ = 0; TAtomic OutstandingWork_ = 0; NNeh::TAutoLockFreeQueue OpQueue_; TEventHandlers Evh_; //i/o event handlers TTimers Timers_; //timeout event handlers size_t FdEventHandlersCnt_ = 0; //i/o event handlers counter size_t TimersOpCnt_ = 0; //timers op counter SOCKET MaxFd_ = 0; //max used descriptor num TAtomic HasAbort_ = 0; bool Aborted_ = false; class TDeadlinesQueue { public: TDeadlinesQueue(TIOService::TImpl& srv) : Srv_(srv) { } inline void Register(TOperation* op) { Deadlines_.Insert(op); } TInstant NextDeadline() { TDeadlines::TIterator it = Deadlines_.Begin(); while (it != Deadlines_.End()) { if (it->DeadLine() > TInstant::Now()) { DBGOUT("TDeadlinesQueue::NewDeadline:" << (it->DeadLine().GetValue() - TInstant::Now().GetValue())); return it->DeadLine(); } TOperation* op = &*(it++); Srv_.OnTimeoutOp(op); } return Deadlines_.Empty() ? TInstant::Max() : Deadlines_.Begin()->DeadLine(); } private: typedef TRbTree TDeadlines; TDeadlines Deadlines_; TIOService::TImpl& Srv_; }; TDeadlinesQueue DeadlinesQueue_; }; }