123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161 |
- #include "io_service_impl.h"
- #include <library/cpp/coroutine/engine/poller.h>
- using namespace NAsio;
- void TFdOperation::AddOp(TIOService::TImpl& srv) {
- srv.AddOp(this);
- }
- void TFdOperation::Finalize() {
- (*PH_)->DelOp(this);
- }
- void TPollFdEventHandler::ExecuteOperations(TFdOperations& oprs, int errorCode) {
- TFdOperations::iterator it = oprs.begin();
- try {
- while (it != oprs.end()) {
- TFdOperation* op = it->Get();
- if (op->Execute(errorCode)) { // throw ?
- if (op->IsRequiredRepeat()) {
- Srv_.UpdateOpDeadline(op);
- ++it; //operation completed, but want be repeated
- } else {
- FinishedOperations_.push_back(*it);
- it = oprs.erase(it);
- }
- } else {
- ++it; //operation not completed
- }
- }
- } catch (...) {
- if (it != oprs.end()) {
- FinishedOperations_.push_back(*it);
- oprs.erase(it);
- }
- throw;
- }
- }
- void TPollFdEventHandler::DelOp(TFdOperation* op) {
- TAutoPtr<TPollFdEventHandler>& evh = *op->PH_;
- if (op->IsPollRead()) {
- Y_ASSERT(FinishOp(ReadOperations_, op));
- } else {
- Y_ASSERT(FinishOp(WriteOperations_, op));
- }
- Srv_.FixHandledEvents(evh); //alarm, - 'this' can be destroyed here!
- }
- void TInterrupterHandler::OnFdEvent(int status, ui16 filter) {
- if (!status && (filter & CONT_POLL_READ)) {
- PI_.Reset();
- }
- }
- void TIOService::TImpl::Run() {
- TEvh& iEvh = Evh_.Get(I_.Fd());
- iEvh.Reset(new TInterrupterHandler(*this, I_));
- TInterrupterKeeper ik(*this, iEvh);
- Y_UNUSED(ik);
- IPollerFace::TEvents evs;
- AtomicSet(NeedCheckOpQueue_, 1);
- TInstant deadline;
- while (Y_LIKELY(!Aborted_ && (AtomicGet(OutstandingWork_) || FdEventHandlersCnt_ > 1 || TimersOpCnt_ || AtomicGet(NeedCheckOpQueue_)))) {
- //while
- // expected work (external flag)
- // or have event handlers (exclude interrupter)
- // or have not completed timer operation
- // or have any operation in queues
- AtomicIncrement(IsWaiting_);
- if (!AtomicGet(NeedCheckOpQueue_)) {
- P_->Wait(evs, deadline);
- }
- AtomicDecrement(IsWaiting_);
- if (evs.size()) {
- for (IPollerFace::TEvents::const_iterator iev = evs.begin(); iev != evs.end() && !Aborted_; ++iev) {
- const IPollerFace::TEvent& ev = *iev;
- TEvh& evh = *(TEvh*)ev.Data;
- if (!evh) {
- continue; //op. cancel (see ProcessOpQueue) can destroy evh
- }
- int status = ev.Status;
- if (ev.Status == EIO) {
- int error = status;
- if (GetSockOpt(evh->Fd(), SOL_SOCKET, SO_ERROR, error) == 0) {
- status = error;
- }
- }
- OnFdEvent(evh, status, ev.Filter); //here handle fd events
- //immediatly after handling events for one descriptor check op. queue
- //often queue can contain another operation for this fd (next async read as sample)
- //so we can optimize redundant epoll_ctl (or similar) calls
- ProcessOpQueue();
- }
- evs.clear();
- } else {
- ProcessOpQueue();
- }
- deadline = DeadlinesQueue_.NextDeadline(); //here handle timeouts/process timers
- }
- }
- void TIOService::TImpl::Abort() {
- class TAbortOperation: public TNoneOperation {
- public:
- TAbortOperation(TIOService::TImpl& srv)
- : TNoneOperation()
- , Srv_(srv)
- {
- Speculative_ = true;
- }
- private:
- bool Execute(int errorCode) override {
- Y_UNUSED(errorCode);
- Srv_.ProcessAbort();
- return true;
- }
- TIOService::TImpl& Srv_;
- };
- AtomicSet(HasAbort_, 1);
- ScheduleOp(new TAbortOperation(*this));
- }
- void TIOService::TImpl::ProcessAbort() {
- Aborted_ = true;
- for (int fd = 0; fd <= MaxFd_; ++fd) {
- TEvh& evh = Evh_.Get(fd);
- if (!!evh && evh->Fd() != I_.Fd()) {
- OnFdEvent(evh, ECANCELED, CONT_POLL_READ | CONT_POLL_WRITE);
- }
- }
- for (auto t : Timers_) {
- t->FailOperations(ECANCELED);
- }
- TOperationPtr op;
- while (OpQueue_.Dequeue(&op)) { //cancel all enqueued operations
- try {
- op->Execute(ECANCELED);
- } catch (...) {
- }
- op.Destroy();
- }
- }
|