io_service_impl.h 21 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758
  1. #pragma once
  2. #include "asio.h"
  3. #include "poll_interrupter.h"
  4. #include <library/cpp/neh/lfqueue.h>
  5. #include <library/cpp/neh/pipequeue.h>
  6. #include <library/cpp/dns/cache.h>
  7. #include <util/generic/hash_set.h>
  8. #include <util/network/iovec.h>
  9. #include <util/network/pollerimpl.h>
  10. #include <util/thread/lfqueue.h>
  11. #include <util/thread/factory.h>
  12. #ifdef DEBUG_ASIO
  13. #define DBGOUT(args) Cout << args << Endl;
  14. #else
  15. #define DBGOUT(args)
  16. #endif
  17. namespace NAsio {
  18. #if defined(_arm_)
  19. template <typename T>
  20. struct TLockFreeSequence {
  21. Y_NO_INLINE T& Get(size_t n) {
  22. with_lock (M) {
  23. return H[n];
  24. }
  25. }
  26. TMutex M;
  27. THashMap<size_t, T> H;
  28. };
  29. #else
  30. //TODO: copypaste from neh, - need fix
  31. template <class T>
  32. class TLockFreeSequence {
  33. public:
  34. inline TLockFreeSequence() {
  35. memset((void*)T_, 0, sizeof(T_));
  36. }
  37. inline ~TLockFreeSequence() {
  38. for (size_t i = 0; i < Y_ARRAY_SIZE(T_); ++i) {
  39. delete[] T_[i];
  40. }
  41. }
  42. inline T& Get(size_t n) {
  43. const size_t i = GetValueBitCount(n + 1) - 1;
  44. return GetList(i)[n + 1 - (((size_t)1) << i)];
  45. }
  46. private:
  47. inline T* GetList(size_t n) {
  48. T* volatile* t = T_ + n;
  49. while (!*t) {
  50. TArrayHolder<T> nt(new T[((size_t)1) << n]);
  51. if (AtomicCas(t, nt.Get(), nullptr)) {
  52. return nt.Release();
  53. }
  54. }
  55. return *t;
  56. }
  57. private:
  58. T* volatile T_[sizeof(size_t) * 8];
  59. };
  60. #endif
  61. struct TOperationCompare {
  62. template <class T>
  63. static inline bool Compare(const T& l, const T& r) noexcept {
  64. return l.DeadLine() < r.DeadLine() || (l.DeadLine() == r.DeadLine() && &l < &r);
  65. }
  66. };
  67. //async operation, execute in contex TIOService()::Run() thread-executor
  68. //usualy used for call functors/callbacks
  69. class TOperation: public TRbTreeItem<TOperation, TOperationCompare>, public IHandlingContext {
  70. public:
  71. TOperation(TInstant deadline = TInstant::Max())
  72. : D_(deadline)
  73. , Speculative_(false)
  74. , RequiredRepeatExecution_(false)
  75. , ND_(deadline)
  76. {
  77. }
  78. //register this operation in svc.impl.
  79. virtual void AddOp(TIOService::TImpl&) = 0;
  80. //return false, if operation not completed
  81. virtual bool Execute(int errorCode = 0) = 0;
  82. void ContinueUseHandler(TDeadline deadline) override {
  83. RequiredRepeatExecution_ = true;
  84. ND_ = deadline;
  85. }
  86. virtual void Finalize() = 0;
  87. inline TInstant Deadline() const noexcept {
  88. return D_;
  89. }
  90. inline TInstant DeadLine() const noexcept {
  91. return D_;
  92. }
  93. inline bool Speculative() const noexcept {
  94. return Speculative_;
  95. }
  96. inline bool IsRequiredRepeat() const noexcept {
  97. return RequiredRepeatExecution_;
  98. }
  99. inline void PrepareReExecution() noexcept {
  100. RequiredRepeatExecution_ = false;
  101. D_ = ND_;
  102. }
  103. protected:
  104. TInstant D_;
  105. bool Speculative_; //if true, operation will be runned immediately after dequeue (even without wating any event)
  106. //as sample used for optimisation writing, - obviously in buffers exist space for write
  107. bool RequiredRepeatExecution_; //set to true, if required re-exec operation
  108. TInstant ND_; //new deadline (for re-exec operation)
  109. };
  110. typedef TAutoPtr<TOperation> TOperationPtr;
  111. class TNoneOperation: public TOperation {
  112. public:
  113. TNoneOperation(TInstant deadline = TInstant::Max())
  114. : TOperation(deadline)
  115. {
  116. }
  117. void AddOp(TIOService::TImpl&) override {
  118. Y_ASSERT(0);
  119. }
  120. void Finalize() override {
  121. }
  122. };
  123. class TPollFdEventHandler;
  124. //descriptor use operation
  125. class TFdOperation: public TOperation {
  126. public:
  127. enum TPollType {
  128. PollRead,
  129. PollWrite
  130. };
  131. TFdOperation(SOCKET fd, TPollType pt, TInstant deadline = TInstant::Max())
  132. : TOperation(deadline)
  133. , Fd_(fd)
  134. , PT_(pt)
  135. , PH_(nullptr)
  136. {
  137. Y_ASSERT(Fd() != INVALID_SOCKET);
  138. }
  139. inline SOCKET Fd() const noexcept {
  140. return Fd_;
  141. }
  142. inline bool IsPollRead() const noexcept {
  143. return PT_ == PollRead;
  144. }
  145. void AddOp(TIOService::TImpl& srv) override;
  146. void Finalize() override;
  147. protected:
  148. SOCKET Fd_;
  149. TPollType PT_;
  150. public:
  151. TAutoPtr<TPollFdEventHandler>* PH_;
  152. };
  153. typedef TAutoPtr<TFdOperation> TFdOperationPtr;
  154. class TPollFdEventHandler {
  155. public:
  156. TPollFdEventHandler(SOCKET fd, TIOService::TImpl& srv)
  157. : Fd_(fd)
  158. , HandledEvents_(0)
  159. , Srv_(srv)
  160. {
  161. }
  162. virtual ~TPollFdEventHandler() {
  163. Y_ASSERT(ReadOperations_.size() == 0);
  164. Y_ASSERT(WriteOperations_.size() == 0);
  165. }
  166. inline void AddReadOp(TFdOperationPtr op) {
  167. ReadOperations_.push_back(op);
  168. }
  169. inline void AddWriteOp(TFdOperationPtr op) {
  170. WriteOperations_.push_back(op);
  171. }
  172. virtual void OnFdEvent(int status, ui16 filter) {
  173. DBGOUT("PollEvent(fd=" << Fd_ << ", " << status << ", " << filter << ")");
  174. if (status) {
  175. ExecuteOperations(ReadOperations_, status);
  176. ExecuteOperations(WriteOperations_, status);
  177. } else {
  178. if (filter & CONT_POLL_READ) {
  179. ExecuteOperations(ReadOperations_, status);
  180. }
  181. if (filter & CONT_POLL_WRITE) {
  182. ExecuteOperations(WriteOperations_, status);
  183. }
  184. }
  185. }
  186. typedef TVector<TFdOperationPtr> TFdOperations;
  187. void ExecuteOperations(TFdOperations& oprs, int errorCode);
  188. //return true if filter handled events changed and require re-configure events poller
  189. virtual bool FixHandledEvents() noexcept {
  190. DBGOUT("TPollFdEventHandler::FixHandledEvents()");
  191. ui16 filter = 0;
  192. if (WriteOperations_.size()) {
  193. filter |= CONT_POLL_WRITE;
  194. }
  195. if (ReadOperations_.size()) {
  196. filter |= CONT_POLL_READ;
  197. }
  198. if (Y_LIKELY(HandledEvents_ == filter)) {
  199. return false;
  200. }
  201. HandledEvents_ = filter;
  202. return true;
  203. }
  204. inline bool FinishOp(TFdOperations& oprs, TFdOperation* op) noexcept {
  205. for (TFdOperations::iterator it = oprs.begin(); it != oprs.end(); ++it) {
  206. if (it->Get() == op) {
  207. FinishedOperations_.push_back(*it);
  208. oprs.erase(it);
  209. return true;
  210. }
  211. }
  212. return false;
  213. }
  214. void DelOp(TFdOperation* op);
  215. inline SOCKET Fd() const noexcept {
  216. return Fd_;
  217. }
  218. inline ui16 HandledEvents() const noexcept {
  219. return HandledEvents_;
  220. }
  221. inline void AddHandlingEvent(ui16 ev) noexcept {
  222. HandledEvents_ |= ev;
  223. }
  224. inline void DestroyFinishedOperations() {
  225. FinishedOperations_.clear();
  226. }
  227. TIOService::TImpl& GetServiceImpl() const noexcept {
  228. return Srv_;
  229. }
  230. protected:
  231. SOCKET Fd_;
  232. ui16 HandledEvents_;
  233. TIOService::TImpl& Srv_;
  234. private:
  235. TVector<TFdOperationPtr> ReadOperations_;
  236. TVector<TFdOperationPtr> WriteOperations_;
  237. // we can't immediatly destroy finished operations, this can cause closing used socket descriptor Fd_
  238. // (on cascade deletion operation object-handler), but later we use Fd_ for modify handled events at poller,
  239. // so we collect here finished operations and destroy it only after update poller, -
  240. // call FixHandledEvents(TPollFdEventHandlerPtr&)
  241. TVector<TFdOperationPtr> FinishedOperations_;
  242. };
  243. //additional descriptor for poller, used for interrupt current poll wait
  244. class TInterrupterHandler: public TPollFdEventHandler {
  245. public:
  246. TInterrupterHandler(TIOService::TImpl& srv, TPollInterrupter& pi)
  247. : TPollFdEventHandler(pi.Fd(), srv)
  248. , PI_(pi)
  249. {
  250. HandledEvents_ = CONT_POLL_READ;
  251. }
  252. ~TInterrupterHandler() override {
  253. DBGOUT("~TInterrupterHandler");
  254. }
  255. void OnFdEvent(int status, ui16 filter) override;
  256. bool FixHandledEvents() noexcept override {
  257. DBGOUT("TInterrupterHandler::FixHandledEvents()");
  258. return false;
  259. }
  260. private:
  261. TPollInterrupter& PI_;
  262. };
  263. namespace {
  264. inline TAutoPtr<IPollerFace> CreatePoller() {
  265. try {
  266. #if defined(_linux_)
  267. return IPollerFace::Construct(TStringBuf("epoll"));
  268. #endif
  269. #if defined(_freebsd_) || defined(_darwin_)
  270. return IPollerFace::Construct(TStringBuf("kqueue"));
  271. #endif
  272. } catch (...) {
  273. Cdbg << CurrentExceptionMessage() << Endl;
  274. }
  275. return IPollerFace::Default();
  276. }
  277. }
  278. //some equivalent TContExecutor
  279. class TIOService::TImpl: public TNonCopyable {
  280. public:
  281. typedef TAutoPtr<TPollFdEventHandler> TEvh;
  282. typedef TLockFreeSequence<TEvh> TEventHandlers;
  283. class TTimer {
  284. public:
  285. typedef THashSet<TOperation*> TOperations;
  286. TTimer(TIOService::TImpl& srv)
  287. : Srv_(srv)
  288. {
  289. }
  290. virtual ~TTimer() {
  291. FailOperations(ECANCELED);
  292. }
  293. void AddOp(TOperation* op) {
  294. THolder<TOperation> tmp(op);
  295. Operations_.insert(op);
  296. Y_UNUSED(tmp.Release());
  297. Srv_.RegisterOpDeadline(op);
  298. Srv_.IncTimersOp();
  299. }
  300. void DelOp(TOperation* op) {
  301. TOperations::iterator it = Operations_.find(op);
  302. if (it != Operations_.end()) {
  303. Srv_.DecTimersOp();
  304. delete op;
  305. Operations_.erase(it);
  306. }
  307. }
  308. inline void FailOperations(int ec) {
  309. for (auto operation : Operations_) {
  310. try {
  311. operation->Execute(ec); //throw ?
  312. } catch (...) {
  313. }
  314. Srv_.DecTimersOp();
  315. delete operation;
  316. }
  317. Operations_.clear();
  318. }
  319. TIOService::TImpl& GetIOServiceImpl() const noexcept {
  320. return Srv_;
  321. }
  322. protected:
  323. TIOService::TImpl& Srv_;
  324. THashSet<TOperation*> Operations_;
  325. };
  326. class TTimers: public THashSet<TTimer*> {
  327. public:
  328. ~TTimers() {
  329. for (auto it : *this) {
  330. delete it;
  331. }
  332. }
  333. };
  334. TImpl()
  335. : P_(CreatePoller())
  336. , DeadlinesQueue_(*this)
  337. {
  338. }
  339. ~TImpl() {
  340. TOperationPtr op;
  341. while (OpQueue_.Dequeue(&op)) { //cancel all enqueued operations
  342. try {
  343. op->Execute(ECANCELED);
  344. } catch (...) {
  345. }
  346. op.Destroy();
  347. }
  348. }
  349. //similar TContExecutor::Execute() or io_service::run()
  350. //process event loop (exit if none to do (no timers or event handlers))
  351. void Run();
  352. //enqueue functor fo call in Run() eventloop (thread safing)
  353. inline void Post(TCompletionHandler h) {
  354. class TFuncOperation: public TNoneOperation {
  355. public:
  356. TFuncOperation(TCompletionHandler completionHandler)
  357. : TNoneOperation()
  358. , H_(std::move(completionHandler))
  359. {
  360. Speculative_ = true;
  361. }
  362. private:
  363. //return false, if operation not completed
  364. bool Execute(int errorCode) override {
  365. Y_UNUSED(errorCode);
  366. H_();
  367. return true;
  368. }
  369. TCompletionHandler H_;
  370. };
  371. ScheduleOp(new TFuncOperation(std::move(h)));
  372. }
  373. //cancel all current operations (handlers be called with errorCode == ECANCELED)
  374. void Abort();
  375. bool HasAbort() {
  376. return AtomicGet(HasAbort_);
  377. }
  378. inline void ScheduleOp(TOperationPtr op) { //throw std::bad_alloc
  379. Y_ASSERT(!Aborted_);
  380. Y_ASSERT(!!op);
  381. OpQueue_.Enqueue(op);
  382. Interrupt();
  383. }
  384. inline void Interrupt() noexcept {
  385. AtomicSet(NeedCheckOpQueue_, 1);
  386. if (AtomicAdd(IsWaiting_, 0) == 1) {
  387. I_.Interrupt();
  388. }
  389. }
  390. inline void UpdateOpDeadline(TOperation* op) {
  391. TInstant oldDeadline = op->Deadline();
  392. op->PrepareReExecution();
  393. if (oldDeadline == op->Deadline()) {
  394. return;
  395. }
  396. if (oldDeadline != TInstant::Max()) {
  397. op->UnLink();
  398. }
  399. if (op->Deadline() != TInstant::Max()) {
  400. DeadlinesQueue_.Register(op);
  401. }
  402. }
  403. void SyncRegisterTimer(TTimer* t) {
  404. Timers_.insert(t);
  405. }
  406. inline void SyncUnregisterAndDestroyTimer(TTimer* t) {
  407. Timers_.erase(t);
  408. delete t;
  409. }
  410. inline void IncTimersOp() noexcept {
  411. ++TimersOpCnt_;
  412. }
  413. inline void DecTimersOp() noexcept {
  414. --TimersOpCnt_;
  415. }
  416. inline void WorkStarted() {
  417. AtomicIncrement(OutstandingWork_);
  418. }
  419. inline void WorkFinished() {
  420. if (AtomicDecrement(OutstandingWork_) == 0) {
  421. Interrupt();
  422. }
  423. }
  424. private:
  425. void ProcessAbort();
  426. inline TEvh& EnsureGetEvh(SOCKET fd) {
  427. TEvh& evh = Evh_.Get(fd);
  428. if (!evh) {
  429. evh.Reset(new TPollFdEventHandler(fd, *this));
  430. }
  431. return evh;
  432. }
  433. inline void OnTimeoutOp(TOperation* op) {
  434. DBGOUT("OnTimeoutOp");
  435. try {
  436. op->Execute(ETIMEDOUT); //throw ?
  437. } catch (...) {
  438. op->Finalize();
  439. throw;
  440. }
  441. if (op->IsRequiredRepeat()) {
  442. //operation not completed
  443. UpdateOpDeadline(op);
  444. } else {
  445. //destroy operation structure
  446. op->Finalize();
  447. }
  448. }
  449. public:
  450. inline void FixHandledEvents(TEvh& evh) {
  451. if (!!evh) {
  452. if (evh->FixHandledEvents()) {
  453. if (!evh->HandledEvents()) {
  454. DelEventHandler(evh);
  455. evh.Destroy();
  456. } else {
  457. ModEventHandler(evh);
  458. evh->DestroyFinishedOperations();
  459. }
  460. } else {
  461. evh->DestroyFinishedOperations();
  462. }
  463. }
  464. }
  465. private:
  466. inline TEvh& GetHandlerForOp(TFdOperation* op) {
  467. TEvh& evh = EnsureGetEvh(op->Fd());
  468. op->PH_ = &evh;
  469. return evh;
  470. }
  471. void ProcessOpQueue() {
  472. if (!AtomicGet(NeedCheckOpQueue_)) {
  473. return;
  474. }
  475. AtomicSet(NeedCheckOpQueue_, 0);
  476. TOperationPtr op;
  477. while (OpQueue_.Dequeue(&op)) {
  478. if (op->Speculative()) {
  479. if (op->Execute(Y_UNLIKELY(Aborted_) ? ECANCELED : 0)) {
  480. op.Destroy();
  481. continue; //operation completed
  482. }
  483. if (!op->IsRequiredRepeat()) {
  484. op->PrepareReExecution();
  485. }
  486. }
  487. RegisterOpDeadline(op.Get());
  488. op.Get()->AddOp(*this); // ... -> AddOp()
  489. Y_UNUSED(op.Release());
  490. }
  491. }
  492. inline void RegisterOpDeadline(TOperation* op) {
  493. if (op->DeadLine() != TInstant::Max()) {
  494. DeadlinesQueue_.Register(op);
  495. }
  496. }
  497. public:
  498. inline void AddOp(TFdOperation* op) {
  499. DBGOUT("AddOp<Fd>(" << op->Fd() << ")");
  500. TEvh& evh = GetHandlerForOp(op);
  501. if (op->IsPollRead()) {
  502. evh->AddReadOp(op);
  503. EnsureEventHandled(evh, CONT_POLL_READ);
  504. } else {
  505. evh->AddWriteOp(op);
  506. EnsureEventHandled(evh, CONT_POLL_WRITE);
  507. }
  508. }
  509. private:
  510. inline void EnsureEventHandled(TEvh& evh, ui16 ev) {
  511. if (!evh->HandledEvents()) {
  512. evh->AddHandlingEvent(ev);
  513. AddEventHandler(evh);
  514. } else {
  515. if ((evh->HandledEvents() & ev) == 0) {
  516. evh->AddHandlingEvent(ev);
  517. ModEventHandler(evh);
  518. }
  519. }
  520. }
  521. public:
  522. //cancel all current operations for socket
  523. //method MUST be called from Run() thread-executor
  524. void CancelFdOp(SOCKET fd) {
  525. TEvh& evh = Evh_.Get(fd);
  526. if (!evh) {
  527. return;
  528. }
  529. OnFdEvent(evh, ECANCELED, CONT_POLL_READ | CONT_POLL_WRITE);
  530. }
  531. private:
  532. //helper for fixing handled events even in case exception
  533. struct TExceptionProofFixerHandledEvents {
  534. TExceptionProofFixerHandledEvents(TIOService::TImpl& srv, TEvh& iEvh)
  535. : Srv_(srv)
  536. , Evh_(iEvh)
  537. {
  538. }
  539. ~TExceptionProofFixerHandledEvents() {
  540. Srv_.FixHandledEvents(Evh_);
  541. }
  542. TIOService::TImpl& Srv_;
  543. TEvh& Evh_;
  544. };
  545. inline void OnFdEvent(TEvh& evh, int status, ui16 filter) {
  546. TExceptionProofFixerHandledEvents fixer(*this, evh);
  547. Y_UNUSED(fixer);
  548. evh->OnFdEvent(status, filter);
  549. }
  550. inline void AddEventHandler(TEvh& evh) {
  551. if (evh->Fd() > MaxFd_) {
  552. MaxFd_ = evh->Fd();
  553. }
  554. SetEventHandler(&evh, evh->Fd(), evh->HandledEvents());
  555. ++FdEventHandlersCnt_;
  556. }
  557. inline void ModEventHandler(TEvh& evh) {
  558. SetEventHandler(&evh, evh->Fd(), evh->HandledEvents());
  559. }
  560. inline void DelEventHandler(TEvh& evh) {
  561. SetEventHandler(&evh, evh->Fd(), 0);
  562. --FdEventHandlersCnt_;
  563. }
  564. inline void SetEventHandler(void* h, int fd, ui16 flags) {
  565. DBGOUT("SetEventHandler(" << fd << ", " << flags << ")");
  566. P_->Set(h, fd, flags);
  567. }
  568. //exception safe call DelEventHandler
  569. struct TInterrupterKeeper {
  570. TInterrupterKeeper(TImpl& srv, TEvh& iEvh)
  571. : Srv_(srv)
  572. , Evh_(iEvh)
  573. {
  574. Srv_.AddEventHandler(Evh_);
  575. }
  576. ~TInterrupterKeeper() {
  577. Srv_.DelEventHandler(Evh_);
  578. }
  579. TImpl& Srv_;
  580. TEvh& Evh_;
  581. };
  582. TAutoPtr<IPollerFace> P_;
  583. TPollInterrupter I_;
  584. TAtomic IsWaiting_ = 0;
  585. TAtomic NeedCheckOpQueue_ = 0;
  586. TAtomic OutstandingWork_ = 0;
  587. NNeh::TAutoLockFreeQueue<TOperation> OpQueue_;
  588. TEventHandlers Evh_; //i/o event handlers
  589. TTimers Timers_; //timeout event handlers
  590. size_t FdEventHandlersCnt_ = 0; //i/o event handlers counter
  591. size_t TimersOpCnt_ = 0; //timers op counter
  592. SOCKET MaxFd_ = 0; //max used descriptor num
  593. TAtomic HasAbort_ = 0;
  594. bool Aborted_ = false;
  595. class TDeadlinesQueue {
  596. public:
  597. TDeadlinesQueue(TIOService::TImpl& srv)
  598. : Srv_(srv)
  599. {
  600. }
  601. inline void Register(TOperation* op) {
  602. Deadlines_.Insert(op);
  603. }
  604. TInstant NextDeadline() {
  605. TDeadlines::TIterator it = Deadlines_.Begin();
  606. while (it != Deadlines_.End()) {
  607. if (it->DeadLine() > TInstant::Now()) {
  608. DBGOUT("TDeadlinesQueue::NewDeadline:" << (it->DeadLine().GetValue() - TInstant::Now().GetValue()));
  609. return it->DeadLine();
  610. }
  611. TOperation* op = &*(it++);
  612. Srv_.OnTimeoutOp(op);
  613. }
  614. return Deadlines_.Empty() ? TInstant::Max() : Deadlines_.Begin()->DeadLine();
  615. }
  616. private:
  617. typedef TRbTree<TOperation, TOperationCompare> TDeadlines;
  618. TDeadlines Deadlines_;
  619. TIOService::TImpl& Srv_;
  620. };
  621. TDeadlinesQueue DeadlinesQueue_;
  622. };
  623. }