io_service_impl.cpp 4.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161
  1. #include "io_service_impl.h"
  2. #include <library/cpp/coroutine/engine/poller.h>
  3. using namespace NAsio;
  4. void TFdOperation::AddOp(TIOService::TImpl& srv) {
  5. srv.AddOp(this);
  6. }
  7. void TFdOperation::Finalize() {
  8. (*PH_)->DelOp(this);
  9. }
  10. void TPollFdEventHandler::ExecuteOperations(TFdOperations& oprs, int errorCode) {
  11. TFdOperations::iterator it = oprs.begin();
  12. try {
  13. while (it != oprs.end()) {
  14. TFdOperation* op = it->Get();
  15. if (op->Execute(errorCode)) { // throw ?
  16. if (op->IsRequiredRepeat()) {
  17. Srv_.UpdateOpDeadline(op);
  18. ++it; //operation completed, but want be repeated
  19. } else {
  20. FinishedOperations_.push_back(*it);
  21. it = oprs.erase(it);
  22. }
  23. } else {
  24. ++it; //operation not completed
  25. }
  26. }
  27. } catch (...) {
  28. if (it != oprs.end()) {
  29. FinishedOperations_.push_back(*it);
  30. oprs.erase(it);
  31. }
  32. throw;
  33. }
  34. }
  35. void TPollFdEventHandler::DelOp(TFdOperation* op) {
  36. TAutoPtr<TPollFdEventHandler>& evh = *op->PH_;
  37. if (op->IsPollRead()) {
  38. Y_ASSERT(FinishOp(ReadOperations_, op));
  39. } else {
  40. Y_ASSERT(FinishOp(WriteOperations_, op));
  41. }
  42. Srv_.FixHandledEvents(evh); //alarm, - 'this' can be destroyed here!
  43. }
  44. void TInterrupterHandler::OnFdEvent(int status, ui16 filter) {
  45. if (!status && (filter & CONT_POLL_READ)) {
  46. PI_.Reset();
  47. }
  48. }
  49. void TIOService::TImpl::Run() {
  50. TEvh& iEvh = Evh_.Get(I_.Fd());
  51. iEvh.Reset(new TInterrupterHandler(*this, I_));
  52. TInterrupterKeeper ik(*this, iEvh);
  53. Y_UNUSED(ik);
  54. IPollerFace::TEvents evs;
  55. AtomicSet(NeedCheckOpQueue_, 1);
  56. TInstant deadline;
  57. while (Y_LIKELY(!Aborted_ && (AtomicGet(OutstandingWork_) || FdEventHandlersCnt_ > 1 || TimersOpCnt_ || AtomicGet(NeedCheckOpQueue_)))) {
  58. //while
  59. // expected work (external flag)
  60. // or have event handlers (exclude interrupter)
  61. // or have not completed timer operation
  62. // or have any operation in queues
  63. AtomicIncrement(IsWaiting_);
  64. if (!AtomicGet(NeedCheckOpQueue_)) {
  65. P_->Wait(evs, deadline);
  66. }
  67. AtomicDecrement(IsWaiting_);
  68. if (evs.size()) {
  69. for (IPollerFace::TEvents::const_iterator iev = evs.begin(); iev != evs.end() && !Aborted_; ++iev) {
  70. const IPollerFace::TEvent& ev = *iev;
  71. TEvh& evh = *(TEvh*)ev.Data;
  72. if (!evh) {
  73. continue; //op. cancel (see ProcessOpQueue) can destroy evh
  74. }
  75. int status = ev.Status;
  76. if (ev.Status == EIO) {
  77. int error = status;
  78. if (GetSockOpt(evh->Fd(), SOL_SOCKET, SO_ERROR, error) == 0) {
  79. status = error;
  80. }
  81. }
  82. OnFdEvent(evh, status, ev.Filter); //here handle fd events
  83. //immediatly after handling events for one descriptor check op. queue
  84. //often queue can contain another operation for this fd (next async read as sample)
  85. //so we can optimize redundant epoll_ctl (or similar) calls
  86. ProcessOpQueue();
  87. }
  88. evs.clear();
  89. } else {
  90. ProcessOpQueue();
  91. }
  92. deadline = DeadlinesQueue_.NextDeadline(); //here handle timeouts/process timers
  93. }
  94. }
  95. void TIOService::TImpl::Abort() {
  96. class TAbortOperation: public TNoneOperation {
  97. public:
  98. TAbortOperation(TIOService::TImpl& srv)
  99. : TNoneOperation()
  100. , Srv_(srv)
  101. {
  102. Speculative_ = true;
  103. }
  104. private:
  105. bool Execute(int errorCode) override {
  106. Y_UNUSED(errorCode);
  107. Srv_.ProcessAbort();
  108. return true;
  109. }
  110. TIOService::TImpl& Srv_;
  111. };
  112. AtomicSet(HasAbort_, 1);
  113. ScheduleOp(new TAbortOperation(*this));
  114. }
  115. void TIOService::TImpl::ProcessAbort() {
  116. Aborted_ = true;
  117. for (int fd = 0; fd <= MaxFd_; ++fd) {
  118. TEvh& evh = Evh_.Get(fd);
  119. if (!!evh && evh->Fd() != I_.Fd()) {
  120. OnFdEvent(evh, ECANCELED, CONT_POLL_READ | CONT_POLL_WRITE);
  121. }
  122. }
  123. for (auto t : Timers_) {
  124. t->FailOperations(ECANCELED);
  125. }
  126. TOperationPtr op;
  127. while (OpQueue_.Dequeue(&op)) { //cancel all enqueued operations
  128. try {
  129. op->Execute(ECANCELED);
  130. } catch (...) {
  131. }
  132. op.Destroy();
  133. }
  134. }