123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656 |
- #include "tcp2.h"
- #include "details.h"
- #include "factory.h"
- #include "http_common.h"
- #include "neh.h"
- #include "utils.h"
- #include <library/cpp/dns/cache.h>
- #include <library/cpp/neh/asio/executor.h>
- #include <library/cpp/threading/atomic/bool.h>
- #include <util/generic/buffer.h>
- #include <util/generic/hash.h>
- #include <util/generic/singleton.h>
- #include <util/network/endpoint.h>
- #include <util/network/init.h>
- #include <util/network/iovec.h>
- #include <util/network/socket.h>
- #include <util/string/cast.h>
- #include <atomic>
- //#define DEBUG_TCP2 1
- #ifdef DEBUG_TCP2
- TSpinLock OUT_LOCK;
- #define DBGOUT(args) \
- { \
- TGuard<TSpinLock> m(OUT_LOCK); \
- Cout << TInstant::Now().GetValue() << " " << args << Endl; \
- }
- #else
- #define DBGOUT(args)
- #endif
- using namespace std::placeholders;
- namespace NNeh {
- TDuration TTcp2Options::ConnectTimeout = TDuration::MilliSeconds(300);
- size_t TTcp2Options::InputBufferSize = 16000;
- size_t TTcp2Options::AsioClientThreads = 4;
- size_t TTcp2Options::AsioServerThreads = 4;
- int TTcp2Options::Backlog = 100;
- bool TTcp2Options::ClientUseDirectWrite = true;
- bool TTcp2Options::ServerUseDirectWrite = true;
- TDuration TTcp2Options::ServerInputDeadline = TDuration::Seconds(3600);
- TDuration TTcp2Options::ServerOutputDeadline = TDuration::Seconds(10);
- bool TTcp2Options::Set(TStringBuf name, TStringBuf value) {
- #define TCP2_TRY_SET(optType, optName) \
- if (name == TStringBuf(#optName)) { \
- optName = FromString<optType>(value); \
- }
- TCP2_TRY_SET(TDuration, ConnectTimeout)
- else TCP2_TRY_SET(size_t, InputBufferSize) else TCP2_TRY_SET(size_t, AsioClientThreads) else TCP2_TRY_SET(size_t, AsioServerThreads) else TCP2_TRY_SET(int, Backlog) else TCP2_TRY_SET(bool, ClientUseDirectWrite) else TCP2_TRY_SET(bool, ServerUseDirectWrite) else TCP2_TRY_SET(TDuration, ServerInputDeadline) else TCP2_TRY_SET(TDuration, ServerOutputDeadline) else {
- return false;
- }
- return true;
- }
- }
- namespace {
- namespace NNehTcp2 {
- using namespace NAsio;
- using namespace NDns;
- using namespace NNeh;
- const TString canceled = "canceled";
- const TString emptyReply = "empty reply";
- inline void PrepareSocket(SOCKET s) {
- SetNoDelay(s, true);
- }
- typedef ui64 TRequestId;
- #pragma pack(push, 1) //disable align struct members (structs mapped to data transmitted other network)
- struct TBaseHeader {
- enum TMessageType {
- Request = 1,
- Response = 2,
- Cancel = 3,
- MaxMessageType
- };
- TBaseHeader(TRequestId id, ui32 headerLength, ui8 version, ui8 mType)
- : Id(id)
- , HeaderLength(headerLength)
- , Version(version)
- , Type(mType)
- {
- }
- TRequestId Id; //message id, - monotonic inc. sequence (skip nil value)
- ui32 HeaderLength;
- ui8 Version; //current version: 1
- ui8 Type; //<- TMessageType (+ in future possible ForceResponse,etc)
- };
- struct TRequestHeader: public TBaseHeader {
- TRequestHeader(TRequestId reqId, size_t servicePathLength, size_t dataSize)
- : TBaseHeader(reqId, sizeof(TRequestHeader) + servicePathLength, 1, (ui8)Request)
- , ContentLength(dataSize)
- {
- }
- ui32 ContentLength;
- };
- struct TResponseHeader: public TBaseHeader {
- enum TErrorCode {
- Success = 0,
- EmptyReply = 1 //not found such service or service not sent response
- ,
- MaxErrorCode
- };
- TResponseHeader(TRequestId reqId, TErrorCode code, size_t dataSize)
- : TBaseHeader(reqId, sizeof(TResponseHeader), 1, (ui8)Response)
- , ErrorCode((ui16)code)
- , ContentLength(dataSize)
- {
- }
- TString ErrorDescription() const {
- if (ErrorCode == (ui16)EmptyReply) {
- return emptyReply;
- }
- TStringStream ss;
- ss << TStringBuf("tcp2 err_code=") << ErrorCode;
- return ss.Str();
- }
- ui16 ErrorCode;
- ui32 ContentLength;
- };
- struct TCancelHeader: public TBaseHeader {
- TCancelHeader(TRequestId reqId)
- : TBaseHeader(reqId, sizeof(TCancelHeader), 1, (ui8)Cancel)
- {
- }
- };
- #pragma pack(pop)
- static const size_t maxHeaderSize = sizeof(TResponseHeader);
- //buffer for read input data, - header + message data
- struct TTcp2Message {
- TTcp2Message()
- : Loader_(&TTcp2Message::LoadBaseHeader)
- , RequireBytesForComplete_(sizeof(TBaseHeader))
- , Header_(sizeof(TBaseHeader))
- {
- }
- void Clear() {
- Loader_ = &TTcp2Message::LoadBaseHeader;
- RequireBytesForComplete_ = sizeof(TBaseHeader);
- Header_.Clear();
- Content_.clear();
- }
- TBuffer& Header() noexcept {
- return Header_;
- }
- const TString& Content() const noexcept {
- return Content_;
- }
- bool IsComplete() const noexcept {
- return RequireBytesForComplete_ == 0;
- }
- size_t LoadFrom(const char* buf, size_t len) {
- return (this->*Loader_)(buf, len);
- }
- const TBaseHeader& BaseHeader() const {
- return *reinterpret_cast<const TBaseHeader*>(Header_.Data());
- }
- const TRequestHeader& RequestHeader() const {
- return *reinterpret_cast<const TRequestHeader*>(Header_.Data());
- }
- const TResponseHeader& ResponseHeader() const {
- return *reinterpret_cast<const TResponseHeader*>(Header_.Data());
- }
- private:
- size_t LoadBaseHeader(const char* buf, size_t len) {
- size_t useBytes = Min<size_t>(sizeof(TBaseHeader) - Header_.Size(), len);
- Header_.Append(buf, useBytes);
- if (Y_UNLIKELY(sizeof(TBaseHeader) > Header_.Size())) {
- //base header yet not complete
- return useBytes;
- }
- {
- const TBaseHeader& hdr = BaseHeader();
- if (BaseHeader().HeaderLength > 32000) { //some heuristic header size limit
- throw yexception() << TStringBuf("to large neh/tcp2 header size: ") << BaseHeader().HeaderLength;
- }
- //header completed
- Header_.Reserve(hdr.HeaderLength);
- }
- const TBaseHeader& hdr = BaseHeader(); //reallocation can move Header_ data to another place, so use fresh 'hdr'
- if (Y_UNLIKELY(hdr.Version != 1)) {
- throw yexception() << TStringBuf("unsupported protocol version: ") << static_cast<unsigned>(hdr.Version);
- }
- RequireBytesForComplete_ = hdr.HeaderLength - sizeof(TBaseHeader);
- return useBytes + LoadHeader(buf + useBytes, len - useBytes);
- }
- size_t LoadHeader(const char* buf, size_t len) {
- size_t useBytes = Min<size_t>(RequireBytesForComplete_, len);
- Header_.Append(buf, useBytes);
- RequireBytesForComplete_ -= useBytes;
- if (RequireBytesForComplete_) {
- //continue load header
- Loader_ = &TTcp2Message::LoadHeader;
- return useBytes;
- }
- const TBaseHeader& hdr = *reinterpret_cast<const TBaseHeader*>(Header_.Data());
- if (hdr.Type == TBaseHeader::Request) {
- if (Header_.Size() < sizeof(TRequestHeader)) {
- throw yexception() << TStringBuf("invalid request header size");
- }
- InitContentLoading(RequestHeader().ContentLength);
- } else if (hdr.Type == TBaseHeader::Response) {
- if (Header_.Size() < sizeof(TResponseHeader)) {
- throw yexception() << TStringBuf("invalid response header size");
- }
- InitContentLoading(ResponseHeader().ContentLength);
- } else if (hdr.Type == TBaseHeader::Cancel) {
- if (Header_.Size() < sizeof(TCancelHeader)) {
- throw yexception() << TStringBuf("invalid cancel header size");
- }
- return useBytes;
- } else {
- throw yexception() << TStringBuf("unsupported request type: ") << static_cast<unsigned>(hdr.Type);
- }
- return useBytes + (this->*Loader_)(buf + useBytes, len - useBytes);
- }
- void InitContentLoading(size_t contentLength) {
- RequireBytesForComplete_ = contentLength;
- Content_.ReserveAndResize(contentLength);
- Loader_ = &TTcp2Message::LoadContent;
- }
- size_t LoadContent(const char* buf, size_t len) {
- size_t curContentSize = Content_.size() - RequireBytesForComplete_;
- size_t useBytes = Min<size_t>(RequireBytesForComplete_, len);
- memcpy(Content_.begin() + curContentSize, buf, useBytes);
- RequireBytesForComplete_ -= useBytes;
- return useBytes;
- }
- private:
- typedef size_t (TTcp2Message::*TLoader)(const char*, size_t);
- TLoader Loader_; //current loader (stages - base-header/header/content)
- size_t RequireBytesForComplete_;
- TBuffer Header_;
- TString Content_;
- };
- //base storage for output data
- class TMultiBuffers {
- public:
- TMultiBuffers()
- : IOVec_(nullptr, 0)
- , DataSize_(0)
- , PoolBytes_(0)
- {
- }
- void Clear() noexcept {
- Parts_.clear();
- DataSize_ = 0;
- PoolBytes_ = 0;
- }
- bool HasFreeSpace() const noexcept {
- return DataSize_ < 64000 && (PoolBytes_ < (MemPoolSize_ - maxHeaderSize));
- }
- bool HasData() const noexcept {
- return Parts_.size();
- }
- TContIOVector* GetIOvec() noexcept {
- return &IOVec_;
- }
- protected:
- void AddPart(const void* buf, size_t len) {
- Parts_.push_back(IOutputStream::TPart(buf, len));
- DataSize_ += len;
- }
- //used for allocate header (MUST be POD type)
- template <typename T>
- inline T* Allocate() noexcept {
- size_t poolBytes = PoolBytes_;
- PoolBytes_ += sizeof(T);
- return (T*)(MemPool_ + poolBytes);
- }
- //used for allocate header (MUST be POD type) + some tail
- template <typename T>
- inline T* AllocatePlus(size_t tailSize) noexcept {
- Y_ASSERT(tailSize <= MemPoolReserve_);
- size_t poolBytes = PoolBytes_;
- PoolBytes_ += sizeof(T) + tailSize;
- return (T*)(MemPool_ + poolBytes);
- }
- protected:
- TContIOVector IOVec_;
- TVector<IOutputStream::TPart> Parts_;
- static const size_t MemPoolSize_ = maxHeaderSize * 100;
- static const size_t MemPoolReserve_ = 32;
- size_t DataSize_;
- size_t PoolBytes_;
- char MemPool_[MemPoolSize_ + MemPoolReserve_];
- };
- //protector for limit usage tcp connection output (and used data) only from one thread at same time
- class TOutputLock {
- public:
- TOutputLock() noexcept
- : Lock_(0)
- {
- }
- bool TryAquire() noexcept {
- do {
- if (AtomicTryLock(&Lock_)) {
- return true;
- }
- } while (!AtomicGet(Lock_)); //without magic loop atomic lock some unreliable
- return false;
- }
- void Release() noexcept {
- AtomicUnlock(&Lock_);
- }
- bool IsFree() const noexcept {
- return !AtomicGet(Lock_);
- }
- private:
- TAtomic Lock_;
- };
- class TClient {
- class TRequest;
- class TConnection;
- typedef TIntrusivePtr<TRequest> TRequestRef;
- typedef TIntrusivePtr<TConnection> TConnectionRef;
- class TRequest: public TThrRefBase, public TNonCopyable {
- public:
- class THandle: public TSimpleHandle {
- public:
- THandle(IOnRecv* f, const TMessage& msg, TStatCollector* s) noexcept
- : TSimpleHandle(f, msg, s)
- {
- }
- bool MessageSendedCompletely() const noexcept override {
- if (TSimpleHandle::MessageSendedCompletely()) {
- return true;
- }
- TRequestRef req = GetRequest();
- if (!!req && req->RequestSendedCompletely()) {
- const_cast<THandle*>(this)->SetSendComplete();
- }
- return TSimpleHandle::MessageSendedCompletely();
- }
- void Cancel() noexcept override {
- if (TSimpleHandle::Canceled()) {
- return;
- }
- TRequestRef req = GetRequest();
- if (!!req) {
- req->Cancel();
- TSimpleHandle::Cancel();
- }
- }
- void NotifyResponse(const TString& resp) {
- TNotifyHandle::NotifyResponse(resp);
- ReleaseRequest();
- }
- void NotifyError(const TString& error) {
- TNotifyHandle::NotifyError(error);
- ReleaseRequest();
- }
- void NotifyError(TErrorRef error) {
- TNotifyHandle::NotifyError(error);
- ReleaseRequest();
- }
- //not thread safe!
- void SetRequest(const TRequestRef& r) noexcept {
- Req_ = r;
- }
- void ReleaseRequest() noexcept {
- TRequestRef tmp;
- TGuard<TSpinLock> g(SP_);
- tmp.Swap(Req_);
- }
- private:
- TRequestRef GetRequest() const noexcept {
- TGuard<TSpinLock> g(SP_);
- return Req_;
- }
- mutable TSpinLock SP_;
- TRequestRef Req_;
- };
- typedef TIntrusivePtr<THandle> THandleRef;
- static void Run(THandleRef& h, const TMessage& msg, TClient& clnt) {
- TRequestRef req(new TRequest(h, msg, clnt));
- h->SetRequest(req);
- req->Run(req);
- }
- ~TRequest() override {
- DBGOUT("TClient::~TRequest()");
- }
- private:
- TRequest(THandleRef& h, TMessage msg, TClient& clnt)
- : Hndl_(h)
- , Clnt_(clnt)
- , Msg_(std::move(msg))
- , Loc_(Msg_.Addr)
- , Addr_(CachedResolve(TResolveInfo(Loc_.Host, Loc_.GetPort())))
- , Canceled_(false)
- , Id_(0)
- {
- DBGOUT("TClient::TRequest()");
- }
- void Run(TRequestRef& req) {
- TDestination& dest = Clnt_.Dest_.Get(Addr_->Id);
- dest.Run(req);
- }
- public:
- void OnResponse(TTcp2Message& msg) {
- DBGOUT("TRequest::OnResponse: " << msg.ResponseHeader().Id);
- THandleRef h = ReleaseHandler();
- if (!h) {
- return;
- }
- const TResponseHeader& respHdr = msg.ResponseHeader();
- if (Y_LIKELY(!respHdr.ErrorCode)) {
- h->NotifyResponse(msg.Content());
- } else {
- h->NotifyError(new TError(respHdr.ErrorDescription(), TError::ProtocolSpecific, respHdr.ErrorCode));
- }
- ReleaseConn();
- }
- void OnError(const TString& err, const i32 systemCode = 0) {
- DBGOUT("TRequest::OnError: " << Id_.load(std::memory_order_acquire));
- THandleRef h = ReleaseHandler();
- if (!h) {
- return;
- }
- h->NotifyError(new TError(err, TError::UnknownType, 0, systemCode));
- ReleaseConn();
- }
- void SetConnection(TConnection* conn) noexcept {
- auto g = Guard(AL_);
- Conn_ = conn;
- }
- bool Canceled() const noexcept {
- return Canceled_;
- }
- const TResolvedHost* Addr() const noexcept {
- return Addr_;
- }
- TStringBuf Service() const noexcept {
- return Loc_.Service;
- }
- const TString& Data() const noexcept {
- return Msg_.Data;
- }
- TClient& Client() noexcept {
- return Clnt_;
- }
- bool RequestSendedCompletely() const noexcept {
- if (Id_.load(std::memory_order_acquire) == 0) {
- return false;
- }
- TConnectionRef conn = GetConn();
- if (!conn) {
- return false;
- }
- TRequestId lastSendedReqId = conn->LastSendedRequestId();
- if (lastSendedReqId >= Id_.load(std::memory_order_acquire)) {
- return true;
- } else if (Y_UNLIKELY((Id_.load(std::memory_order_acquire) - lastSendedReqId) > (Max<TRequestId>() - Max<ui32>()))) {
- //overflow req-id value
- return true;
- }
- return false;
- }
- void Cancel() noexcept {
- Canceled_ = true;
- THandleRef h = ReleaseHandler();
- if (!h) {
- return;
- }
- TConnectionRef conn = ReleaseConn();
- if (!!conn && Id_.load(std::memory_order_acquire)) {
- conn->Cancel(Id_.load(std::memory_order_acquire));
- }
- h->NotifyError(new TError(canceled, TError::Cancelled));
- }
- void SetReqId(TRequestId reqId) noexcept {
- auto guard = Guard(IdLock_);
- Id_.store(reqId, std::memory_order_release);
- }
- TRequestId ReqId() const noexcept {
- return Id_.load(std::memory_order_acquire);
- }
- private:
- inline THandleRef ReleaseHandler() noexcept {
- THandleRef h;
- {
- auto g = Guard(AL_);
- h.Swap(Hndl_);
- }
- return h;
- }
- inline TConnectionRef GetConn() const noexcept {
- auto g = Guard(AL_);
- return Conn_;
- }
- inline TConnectionRef ReleaseConn() noexcept {
- TConnectionRef c;
- {
- auto g = Guard(AL_);
- c.Swap(Conn_);
- }
- return c;
- }
- mutable TAdaptiveLock AL_; //guaranted calling notify() only once (prevent race between asio thread and current)
- THandleRef Hndl_;
- TClient& Clnt_;
- const TMessage Msg_;
- const TParsedLocation Loc_;
- const TResolvedHost* Addr_;
- TConnectionRef Conn_;
- NAtomic::TBool Canceled_;
- TSpinLock IdLock_;
- std::atomic<TRequestId> Id_;
- };
- class TConnection: public TThrRefBase {
- enum TState {
- Init,
- Connecting,
- Connected,
- Closed,
- MaxState
- };
- typedef THashMap<TRequestId, TRequestRef> TReqsInFly;
- public:
- class TOutputBuffers: public TMultiBuffers {
- public:
- void AddRequest(const TRequestRef& req) {
- Requests_.push_back(req);
- if (req->Service().size() > MemPoolReserve_) {
- TRequestHeader* hdr = new (Allocate<TRequestHeader>()) TRequestHeader(req->ReqId(), req->Service().size(), req->Data().size());
- AddPart(hdr, sizeof(TRequestHeader));
- AddPart(req->Service().data(), req->Service().size());
- } else {
- TRequestHeader* hdr = new (AllocatePlus<TRequestHeader>(req->Service().size())) TRequestHeader(req->ReqId(), req->Service().size(), req->Data().size());
- AddPart(hdr, sizeof(TRequestHeader) + req->Service().size());
- memmove(++hdr, req->Service().data(), req->Service().size());
- }
- AddPart(req->Data().data(), req->Data().size());
- IOVec_ = TContIOVector(Parts_.data(), Parts_.size());
- }
- void AddCancelRequest(TRequestId reqId) {
- TCancelHeader* hdr = new (Allocate<TCancelHeader>()) TCancelHeader(reqId);
- AddPart(hdr, sizeof(TCancelHeader));
- IOVec_ = TContIOVector(Parts_.data(), Parts_.size());
- }
- void Clear() {
- TMultiBuffers::Clear();
- Requests_.clear();
- }
- private:
- TVector<TRequestRef> Requests_;
- };
- TConnection(TIOService& srv)
- : AS_(srv)
- , State_(Init)
- , BuffSize_(TTcp2Options::InputBufferSize)
- , Buff_(new char[BuffSize_])
- , NeedCheckReqsQueue_(0)
- , NeedCheckCancelsQueue_(0)
- , GenReqId_(0)
- , LastSendedReqId_(0)
- {
- }
- ~TConnection() override {
- try {
- DBGOUT("TClient::~TConnection()");
- OnError("~");
- } catch (...) {
- Cdbg << "tcp2::~cln_conn: " << CurrentExceptionMessage() << Endl;
- }
- }
- //called from client thread
- bool Run(TRequestRef& req) {
- if (Y_UNLIKELY(AtomicGet(State_) == Closed)) {
- return false;
- }
- req->Ref();
- try {
- Reqs_.Enqueue(req.Get());
- } catch (...) {
- req->UnRef();
- throw;
- }
- AtomicSet(NeedCheckReqsQueue_, 1);
- req->SetConnection(this);
- TAtomicBase state = AtomicGet(State_);
- if (Y_LIKELY(state == Connected)) {
- ProcessOutputReqsQueue();
- return true;
- }
- if (state == Init) {
- if (AtomicCas(&State_, Connecting, Init)) {
- try {
- TEndpoint addr(new NAddr::TAddrInfo(&*req->Addr()->Addr.Begin()));
- AS_.AsyncConnect(addr, std::bind(&TConnection::OnConnect, TConnectionRef(this), _1, _2), TTcp2Options::ConnectTimeout);
- } catch (...) {
- AS_.GetIOService().Post(std::bind(&TConnection::OnErrorCallback, TConnectionRef(this), CurrentExceptionMessage()));
- }
- return true;
- }
- }
- state = AtomicGet(State_);
- if (state == Connected) {
- ProcessOutputReqsQueue();
- } else if (state == Closed) {
- SafeOnError();
- }
- return true;
- }
- //called from client thread
- void Cancel(TRequestId id) {
- Cancels_.Enqueue(id);
- AtomicSet(NeedCheckCancelsQueue_, 1);
- if (Y_LIKELY(AtomicGet(State_) == Connected)) {
- ProcessOutputCancelsQueue();
- }
- }
- void ProcessOutputReqsQueue() {
- if (OutputLock_.TryAquire()) {
- SendMessages(false);
- }
- }
- void ProcessOutputCancelsQueue() {
- if (OutputLock_.TryAquire()) {
- AS_.GetIOService().Post(std::bind(&TConnection::SendMessages, TConnectionRef(this), true));
- return;
- }
- }
- //must be called only from asio thread
- void ProcessReqsInFlyQueue() {
- if (AtomicGet(State_) == Closed) {
- return;
- }
- TRequest* reqPtr;
- while (ReqsInFlyQueue_.Dequeue(&reqPtr)) {
- TRequestRef reqTmp(reqPtr);
- reqPtr->UnRef();
- ReqsInFly_[reqPtr->ReqId()].Swap(reqTmp);
- }
- }
- //must be called only from asio thread
- void OnConnect(const TErrorCode& ec, IHandlingContext&) {
- DBGOUT("TConnect::OnConnect: " << ec.Value());
- if (Y_UNLIKELY(ec)) {
- if (ec.Value() == EIO) {
- //try get more detail error info
- char buf[1];
- TErrorCode errConnect;
- AS_.ReadSome(buf, 1, errConnect);
- OnErrorCode(errConnect.Value() ? errConnect : ec);
- } else {
- OnErrorCode(ec);
- }
- } else {
- try {
- PrepareSocket(AS_.Native());
- AtomicSet(State_, Connected);
- AS_.AsyncPollRead(std::bind(&TConnection::OnCanRead, TConnectionRef(this), _1, _2));
- if (OutputLock_.TryAquire()) {
- SendMessages(true);
- return;
- }
- } catch (...) {
- OnError(CurrentExceptionMessage());
- }
- }
- }
- //must be called only after succes aquiring output
- void SendMessages(bool asioThread) {
- //DBGOUT("SendMessages");
- if (Y_UNLIKELY(AtomicGet(State_) == Closed)) {
- if (asioThread) {
- OnError(Error_);
- } else {
- SafeOnError();
- }
- return;
- }
- do {
- if (asioThread) {
- AtomicSet(NeedCheckCancelsQueue_, 0);
- TRequestId reqId;
- ProcessReqsInFlyQueue();
- while (Cancels_.Dequeue(&reqId)) {
- TReqsInFly::iterator it = ReqsInFly_.find(reqId);
- if (it == ReqsInFly_.end()) {
- continue;
- }
- ReqsInFly_.erase(it);
- OutputBuffers_.AddCancelRequest(reqId);
- if (Y_UNLIKELY(!OutputBuffers_.HasFreeSpace())) {
- if (!FlushOutputBuffers(asioThread, 0)) {
- return;
- }
- }
- }
- } else if (AtomicGet(NeedCheckCancelsQueue_)) {
- AS_.GetIOService().Post(std::bind(&TConnection::SendMessages, TConnectionRef(this), true));
- return;
- }
- TRequestId lastReqId = 0;
- {
- AtomicSet(NeedCheckReqsQueue_, 0);
- TRequest* reqPtr;
- while (Reqs_.Dequeue(&reqPtr)) {
- TRequestRef reqTmp(reqPtr);
- reqPtr->UnRef();
- reqPtr->SetReqId(GenerateReqId());
- if (reqPtr->Canceled()) {
- continue;
- }
- lastReqId = reqPtr->ReqId();
- if (asioThread) {
- TRequestRef& req = ReqsInFly_[(TRequestId)reqPtr->ReqId()];
- req.Swap(reqTmp);
- OutputBuffers_.AddRequest(req);
- } else { //can access to ReqsInFly_ only from asio thread, so enqueue req to update ReqsInFly_ queue
- try {
- reqTmp->Ref();
- ReqsInFlyQueue_.Enqueue(reqPtr);
- } catch (...) {
- reqTmp->UnRef();
- throw;
- }
- OutputBuffers_.AddRequest(reqTmp);
- }
- if (Y_UNLIKELY(!OutputBuffers_.HasFreeSpace())) {
- if (!FlushOutputBuffers(asioThread, lastReqId)) {
- return;
- }
- }
- }
- }
- if (OutputBuffers_.HasData()) {
- if (!FlushOutputBuffers(asioThread, lastReqId)) {
- return;
- }
- }
- OutputLock_.Release();
- if (!AtomicGet(NeedCheckReqsQueue_) && !AtomicGet(NeedCheckCancelsQueue_)) {
- DBGOUT("TClient::SendMessages(exit2)");
- return;
- }
- } while (OutputLock_.TryAquire());
- DBGOUT("TClient::SendMessages(exit1)");
- }
- TRequestId GenerateReqId() noexcept {
- TRequestId reqId;
- {
- auto guard = Guard(GenReqIdLock_);
- reqId = ++GenReqId_;
- }
- return Y_LIKELY(reqId) ? reqId : GenerateReqId();
- }
- //called non thread-safe (from outside thread)
- bool FlushOutputBuffers(bool asioThread, TRequestId reqId) {
- if (asioThread || TTcp2Options::ClientUseDirectWrite) {
- TContIOVector& vec = *OutputBuffers_.GetIOvec();
- TErrorCode err;
- vec.Proceed(AS_.WriteSome(vec, err));
- if (Y_UNLIKELY(err)) {
- if (asioThread) {
- OnErrorCode(err);
- } else {
- AS_.GetIOService().Post(std::bind(&TConnection::OnErrorCode, TConnectionRef(this), err));
- }
- return false;
- }
- if (vec.Complete()) {
- LastSendedReqId_.store(reqId, std::memory_order_release);
- DBGOUT("Client::FlushOutputBuffers(" << reqId << ")");
- OutputBuffers_.Clear();
- return true;
- }
- }
- DBGOUT("Client::AsyncWrite(" << reqId << ")");
- AS_.AsyncWrite(OutputBuffers_.GetIOvec(), std::bind(&TConnection::OnSend, TConnectionRef(this), reqId, _1, _2, _3), TTcp2Options::ServerOutputDeadline);
- return false;
- }
- //must be called only from asio thread
- void OnSend(TRequestId reqId, const TErrorCode& ec, size_t amount, IHandlingContext&) {
- Y_UNUSED(amount);
- if (Y_UNLIKELY(ec)) {
- OnErrorCode(ec);
- } else {
- if (Y_LIKELY(reqId)) {
- DBGOUT("Client::OnSend(" << reqId << ")");
- LastSendedReqId_.store(reqId, std::memory_order_release);
- }
- //output already aquired, used asio thread
- OutputBuffers_.Clear();
- SendMessages(true);
- }
- }
- //must be called only from asio thread
- void OnCanRead(const TErrorCode& ec, IHandlingContext& ctx) {
- //DBGOUT("OnCanRead(" << ec.Value() << ")");
- if (Y_UNLIKELY(ec)) {
- OnErrorCode(ec);
- } else {
- TErrorCode ec2;
- OnReadSome(ec2, AS_.ReadSome(Buff_.Get(), BuffSize_, ec2), ctx);
- }
- }
- //must be called only from asio thread
- void OnReadSome(const TErrorCode& ec, size_t amount, IHandlingContext& ctx) {
- //DBGOUT("OnReadSome(" << ec.Value() << ", " << amount << ")");
- if (Y_UNLIKELY(ec)) {
- OnErrorCode(ec);
- return;
- }
- while (1) {
- if (Y_UNLIKELY(!amount)) {
- OnError("tcp conn. closed");
- return;
- }
- try {
- const char* buff = Buff_.Get();
- size_t leftBytes = amount;
- do {
- size_t useBytes = Msg_.LoadFrom(buff, leftBytes);
- leftBytes -= useBytes;
- buff += useBytes;
- if (Msg_.IsComplete()) {
- //DBGOUT("OnReceiveMessage(" << Msg_.BaseHeader().Id << "): " << leftBytes);
- OnReceiveMessage();
- Msg_.Clear();
- }
- } while (leftBytes);
- if (amount == BuffSize_) {
- //try decrease system calls, - re-run ReadSome if has full filled buffer
- TErrorCode ecR;
- amount = AS_.ReadSome(Buff_.Get(), BuffSize_, ecR);
- if (!ecR) {
- continue; //process next input data
- }
- if (ecR.Value() == EAGAIN || ecR.Value() == EWOULDBLOCK) {
- ctx.ContinueUseHandler();
- } else {
- OnErrorCode(ec);
- }
- } else {
- ctx.ContinueUseHandler();
- }
- } catch (...) {
- OnError(CurrentExceptionMessage());
- }
- return;
- }
- }
- //must be called only from asio thread
- void OnErrorCode(TErrorCode ec) {
- OnError(ec.Text(), ec.Value());
- }
- //must be called only from asio thread
- void OnErrorCallback(TString err) {
- OnError(err);
- }
- //must be called only from asio thread
- void OnError(const TString& err, const i32 systemCode = 0) {
- if (AtomicGet(State_) != Closed) {
- Error_ = err;
- SystemCode_ = systemCode;
- AtomicSet(State_, Closed);
- AS_.AsyncCancel();
- }
- SafeOnError();
- for (auto& it : ReqsInFly_) {
- it.second->OnError(err);
- }
- ReqsInFly_.clear();
- }
- void SafeOnError() {
- TRequest* reqPtr;
- while (Reqs_.Dequeue(&reqPtr)) {
- TRequestRef req(reqPtr);
- reqPtr->UnRef();
- //DBGOUT("err queue(" << AS_.Native() << "):" << size_t(reqPtr));
- req->OnError(Error_, SystemCode_);
- }
- while (ReqsInFlyQueue_.Dequeue(&reqPtr)) {
- TRequestRef req(reqPtr);
- reqPtr->UnRef();
- //DBGOUT("err fly queue(" << AS_.Native() << "):" << size_t(reqPtr));
- req->OnError(Error_, SystemCode_);
- }
- }
- //must be called only from asio thread
- void OnReceiveMessage() {
- //DBGOUT("OnReceiveMessage");
- const TBaseHeader& hdr = Msg_.BaseHeader();
- if (hdr.Type == TBaseHeader::Response) {
- ProcessReqsInFlyQueue();
- TReqsInFly::iterator it = ReqsInFly_.find(hdr.Id);
- if (it == ReqsInFly_.end()) {
- DBGOUT("ignore response: " << hdr.Id);
- return;
- }
- it->second->OnResponse(Msg_);
- ReqsInFly_.erase(it);
- } else {
- throw yexception() << TStringBuf("unsupported message type: ") << hdr.Type;
- }
- }
- TRequestId LastSendedRequestId() const noexcept {
- return LastSendedReqId_.load(std::memory_order_acquire);
- }
- private:
- NAsio::TTcpSocket AS_;
- TAtomic State_; //state machine status (TState)
- TString Error_;
- i32 SystemCode_ = 0;
- //input
- size_t BuffSize_;
- TArrayHolder<char> Buff_;
- TTcp2Message Msg_;
- //output
- TOutputLock OutputLock_;
- TAtomic NeedCheckReqsQueue_;
- TLockFreeQueue<TRequest*> Reqs_;
- TAtomic NeedCheckCancelsQueue_;
- TLockFreeQueue<TRequestId> Cancels_;
- TAdaptiveLock GenReqIdLock_;
- std::atomic<TRequestId> GenReqId_;
- std::atomic<TRequestId> LastSendedReqId_;
- TLockFreeQueue<TRequest*> ReqsInFlyQueue_;
- TReqsInFly ReqsInFly_;
- TOutputBuffers OutputBuffers_;
- };
- class TDestination {
- public:
- void Run(TRequestRef& req) {
- while (1) {
- TConnectionRef conn = GetConnection();
- if (!!conn && conn->Run(req)) {
- return;
- }
- DBGOUT("TDestination CreateConnection");
- CreateConnection(conn, req->Client().ExecutorsPool().GetExecutor().GetIOService());
- }
- }
- private:
- TConnectionRef GetConnection() {
- TGuard<TSpinLock> g(L_);
- return Conn_;
- }
- void CreateConnection(TConnectionRef& oldConn, TIOService& srv) {
- TConnectionRef conn(new TConnection(srv));
- TGuard<TSpinLock> g(L_);
- if (Conn_ == oldConn) {
- Conn_.Swap(conn);
- }
- }
- TSpinLock L_;
- TConnectionRef Conn_;
- };
- //////////// TClient /////////
- public:
- TClient()
- : EP_(TTcp2Options::AsioClientThreads)
- {
- }
- ~TClient() {
- EP_.SyncShutdown();
- }
- THandleRef Schedule(const TMessage& msg, IOnRecv* fallback, TServiceStatRef& ss) {
- //find exist connection or create new
- TRequest::THandleRef hndl(new TRequest::THandle(fallback, msg, !ss ? nullptr : new TStatCollector(ss)));
- try {
- TRequest::Run(hndl, msg, *this);
- } catch (...) {
- hndl->ResetOnRecv();
- hndl->ReleaseRequest();
- throw;
- }
- return hndl.Get();
- }
- TExecutorsPool& ExecutorsPool() {
- return EP_;
- }
- private:
- NNeh::NHttp::TLockFreeSequence<TDestination> Dest_;
- TExecutorsPool EP_;
- };
- ////////// server side ////////////////////////////////////////////////////////////////////////////////////////////
- class TServer: public IRequester {
- typedef TAutoPtr<TTcpAcceptor> TTcpAcceptorPtr;
- typedef TAtomicSharedPtr<TTcpSocket> TTcpSocketRef;
- class TConnection;
- typedef TIntrusivePtr<TConnection> TConnectionRef;
- struct TRequest: public IRequest {
- struct TState: public TThrRefBase {
- TState()
- : Canceled(false)
- {
- }
- TAtomicBool Canceled;
- };
- typedef TIntrusivePtr<TState> TStateRef;
- TRequest(const TConnectionRef& conn, TBuffer& buf, const TString& content);
- ~TRequest() override;
- TStringBuf Scheme() const override {
- return TStringBuf("tcp2");
- }
- TString RemoteHost() const override;
- TStringBuf Service() const override {
- return TStringBuf(Buf.Data() + sizeof(TRequestHeader), Buf.End());
- }
- TStringBuf Data() const override {
- return TStringBuf(Content_);
- }
- TStringBuf RequestId() const override {
- return TStringBuf();
- }
- bool Canceled() const override {
- return State->Canceled;
- }
- void SendReply(TData& data) override;
- void SendError(TResponseError, const TString&) override {
- // TODO
- }
- const TRequestHeader& RequestHeader() const noexcept {
- return *reinterpret_cast<const TRequestHeader*>(Buf.Data());
- }
- private:
- TConnectionRef Conn;
- TBuffer Buf; //service-name + message-data
- TString Content_;
- TAtomic Replied_;
- public:
- TIntrusivePtr<TState> State;
- };
- class TConnection: public TThrRefBase {
- private:
- TConnection(TServer& srv, const TTcpSocketRef& sock)
- : Srv_(srv)
- , AS_(sock)
- , Canceled_(false)
- , RemoteHost_(NNeh::PrintHostByRfc(*AS_->RemoteEndpoint().Addr()))
- , BuffSize_(TTcp2Options::InputBufferSize)
- , Buff_(new char[BuffSize_])
- , NeedCheckOutputQueue_(0)
- {
- DBGOUT("TServer::TConnection()");
- }
- public:
- class TOutputBuffers: public TMultiBuffers {
- public:
- void AddResponse(TRequestId reqId, TData& data) {
- TResponseHeader* hdr = new (Allocate<TResponseHeader>()) TResponseHeader(reqId, TResponseHeader::Success, data.size());
- ResponseData_.push_back(TAutoPtr<TData>(new TData()));
- TData& movedData = *ResponseData_.back();
- movedData.swap(data);
- AddPart(hdr, sizeof(TResponseHeader));
- AddPart(movedData.data(), movedData.size());
- IOVec_ = TContIOVector(Parts_.data(), Parts_.size());
- }
- void AddError(TRequestId reqId, TResponseHeader::TErrorCode errCode) {
- TResponseHeader* hdr = new (Allocate<TResponseHeader>()) TResponseHeader(reqId, errCode, 0);
- AddPart(hdr, sizeof(TResponseHeader));
- IOVec_ = TContIOVector(Parts_.data(), Parts_.size());
- }
- void Clear() {
- TMultiBuffers::Clear();
- ResponseData_.clear();
- }
- private:
- TVector<TAutoPtr<TData>> ResponseData_;
- };
- static void Create(TServer& srv, const TTcpSocketRef& sock) {
- TConnectionRef conn(new TConnection(srv, sock));
- conn->AS_->AsyncPollRead(std::bind(&TConnection::OnCanRead, conn, _1, _2), TTcp2Options::ServerInputDeadline);
- }
- ~TConnection() override {
- DBGOUT("~TServer::TConnection(" << (!AS_ ? -666 : AS_->Native()) << ")");
- }
- private:
- void OnCanRead(const TErrorCode& ec, IHandlingContext& ctx) {
- if (ec) {
- OnError();
- } else {
- TErrorCode ec2;
- OnReadSome(ec2, AS_->ReadSome(Buff_.Get(), BuffSize_, ec2), ctx);
- }
- }
- void OnError() {
- DBGOUT("Srv OnError(" << (!AS_ ? -666 : AS_->Native()) << ")"
- << " c=" << (size_t)this);
- Canceled_ = true;
- AS_->AsyncCancel();
- }
- void OnReadSome(const TErrorCode& ec, size_t amount, IHandlingContext& ctx) {
- while (1) {
- if (ec || !amount) {
- OnError();
- return;
- }
- try {
- const char* buff = Buff_.Get();
- size_t leftBytes = amount;
- do {
- size_t useBytes = Msg_.LoadFrom(buff, leftBytes);
- leftBytes -= useBytes;
- buff += useBytes;
- if (Msg_.IsComplete()) {
- OnReceiveMessage();
- }
- } while (leftBytes);
- if (amount == BuffSize_) {
- //try decrease system calls, - re-run ReadSome if has full filled buffer
- TErrorCode ecR;
- amount = AS_->ReadSome(Buff_.Get(), BuffSize_, ecR);
- if (!ecR) {
- continue;
- }
- if (ecR.Value() == EAGAIN || ecR.Value() == EWOULDBLOCK) {
- ctx.ContinueUseHandler();
- } else {
- OnError();
- }
- } else {
- ctx.ContinueUseHandler();
- }
- } catch (...) {
- DBGOUT("exc. " << CurrentExceptionMessage());
- OnError();
- }
- return;
- }
- }
- void OnReceiveMessage() {
- DBGOUT("OnReceiveMessage()");
- const TBaseHeader& hdr = Msg_.BaseHeader();
- if (hdr.Type == TBaseHeader::Request) {
- TRequest* reqPtr = new TRequest(TConnectionRef(this), Msg_.Header(), Msg_.Content());
- IRequestRef req(reqPtr);
- ReqsState_[reqPtr->RequestHeader().Id] = reqPtr->State;
- OnRequest(req);
- } else if (hdr.Type == TBaseHeader::Cancel) {
- OnCancelRequest(hdr.Id);
- } else {
- throw yexception() << "unsupported message type: " << (ui32)hdr.Type;
- }
- Msg_.Clear();
- {
- TRequestId reqId;
- while (FinReqs_.Dequeue(&reqId)) {
- ReqsState_.erase(reqId);
- }
- }
- }
- void OnRequest(IRequestRef& r) {
- DBGOUT("OnRequest()");
- Srv_.OnRequest(r);
- }
- void OnCancelRequest(TRequestId reqId) {
- THashMap<TRequestId, TRequest::TStateRef>::iterator it = ReqsState_.find(reqId);
- if (it == ReqsState_.end()) {
- return;
- }
- it->second->Canceled = true;
- }
- public:
- class TOutputData {
- public:
- TOutputData(TRequestId reqId)
- : ReqId(reqId)
- {
- }
- virtual ~TOutputData() {
- }
- virtual void MoveTo(TOutputBuffers& bufs) = 0;
- TRequestId ReqId;
- };
- class TResponseData: public TOutputData {
- public:
- TResponseData(TRequestId reqId, TData& data)
- : TOutputData(reqId)
- {
- Data.swap(data);
- }
- void MoveTo(TOutputBuffers& bufs) override {
- bufs.AddResponse(ReqId, Data);
- }
- TData Data;
- };
- class TResponseErrorData: public TOutputData {
- public:
- TResponseErrorData(TRequestId reqId, TResponseHeader::TErrorCode errorCode)
- : TOutputData(reqId)
- , ErrorCode(errorCode)
- {
- }
- void MoveTo(TOutputBuffers& bufs) override {
- bufs.AddError(ReqId, ErrorCode);
- }
- TResponseHeader::TErrorCode ErrorCode;
- };
- //called non thread-safe (from client thread)
- void SendResponse(TRequestId reqId, TData& data) {
- DBGOUT("SendResponse: " << reqId << " " << (size_t)~data << " c=" << (size_t)this);
- TAutoPtr<TOutputData> od(new TResponseData(reqId, data));
- OutputData_.Enqueue(od);
- ProcessOutputQueue();
- }
- //called non thread-safe (from outside thread)
- void SendError(TRequestId reqId, TResponseHeader::TErrorCode err) {
- DBGOUT("SendResponseError: " << reqId << " c=" << (size_t)this);
- TAutoPtr<TOutputData> od(new TResponseErrorData(reqId, err));
- OutputData_.Enqueue(od);
- ProcessOutputQueue();
- }
- void ProcessOutputQueue() {
- AtomicSet(NeedCheckOutputQueue_, 1);
- if (OutputLock_.TryAquire()) {
- SendMessages(false);
- return;
- }
- DBGOUT("ProcessOutputQueue: !AquireOutputOwnership: " << (int)OutputLock_.IsFree());
- }
- //must be called only after success aquiring output
- void SendMessages(bool asioThread) {
- DBGOUT("TServer::SendMessages(enter)");
- try {
- do {
- AtomicUnlock(&NeedCheckOutputQueue_);
- TAutoPtr<TOutputData> d;
- while (OutputData_.Dequeue(&d)) {
- d->MoveTo(OutputBuffers_);
- if (!OutputBuffers_.HasFreeSpace()) {
- if (!FlushOutputBuffers(asioThread)) {
- return;
- }
- }
- }
- if (OutputBuffers_.HasData()) {
- if (!FlushOutputBuffers(asioThread)) {
- return;
- }
- }
- OutputLock_.Release();
- if (!AtomicGet(NeedCheckOutputQueue_)) {
- DBGOUT("Server::SendMessages(exit2): " << (int)OutputLock_.IsFree());
- return;
- }
- } while (OutputLock_.TryAquire());
- DBGOUT("Server::SendMessages(exit1)");
- } catch (...) {
- OnError();
- }
- }
- bool FlushOutputBuffers(bool asioThread) {
- DBGOUT("FlushOutputBuffers: cnt=" << OutputBuffers_.GetIOvec()->Count() << " c=" << (size_t)this);
- //TODO:reseach direct write efficiency
- if (asioThread || TTcp2Options::ServerUseDirectWrite) {
- TContIOVector& vec = *OutputBuffers_.GetIOvec();
- vec.Proceed(AS_->WriteSome(vec));
- if (vec.Complete()) {
- OutputBuffers_.Clear();
- //DBGOUT("WriteResponse: " << " c=" << (size_t)this);
- return true;
- }
- }
- //socket buffer filled - use async write for sending left data
- DBGOUT("AsyncWriteResponse: "
- << " [" << OutputBuffers_.GetIOvec()->Bytes() << "]"
- << " c=" << (size_t)this);
- AS_->AsyncWrite(OutputBuffers_.GetIOvec(), std::bind(&TConnection::OnSend, TConnectionRef(this), _1, _2, _3), TTcp2Options::ServerOutputDeadline);
- return false;
- }
- void OnFinishRequest(TRequestId reqId) {
- if (Y_LIKELY(!Canceled_)) {
- FinReqs_.Enqueue(reqId);
- }
- }
- private:
- void OnSend(const TErrorCode& ec, size_t amount, IHandlingContext&) {
- Y_UNUSED(amount);
- DBGOUT("TServer::OnSend(" << ec.Value() << ", " << amount << ")");
- if (ec) {
- OnError();
- } else {
- OutputBuffers_.Clear();
- SendMessages(true);
- }
- }
- public:
- bool IsCanceled() const noexcept {
- return Canceled_;
- }
- const TString& RemoteHost() const noexcept {
- return RemoteHost_;
- }
- private:
- TServer& Srv_;
- TTcpSocketRef AS_;
- NAtomic::TBool Canceled_;
- TString RemoteHost_;
- //input
- size_t BuffSize_;
- TArrayHolder<char> Buff_;
- TTcp2Message Msg_;
- THashMap<TRequestId, TRequest::TStateRef> ReqsState_;
- TLockFreeQueue<TRequestId> FinReqs_;
- //output
- TOutputLock OutputLock_; //protect socket/buffers from simultaneous access from few threads
- TAtomic NeedCheckOutputQueue_;
- NNeh::TAutoLockFreeQueue<TOutputData> OutputData_;
- TOutputBuffers OutputBuffers_;
- };
- //////////// TServer /////////
- public:
- TServer(IOnRequest* cb, ui16 port)
- : EP_(TTcp2Options::AsioServerThreads)
- , CB_(cb)
- {
- TNetworkAddress addr(port);
- for (TNetworkAddress::TIterator it = addr.Begin(); it != addr.End(); ++it) {
- TEndpoint ep(new NAddr::TAddrInfo(&*it));
- TTcpAcceptorPtr a(new TTcpAcceptor(EA_.GetIOService()));
- //DBGOUT("bind:" << ep.IpToString() << ":" << ep.Port());
- a->Bind(ep);
- a->Listen(TTcp2Options::Backlog);
- StartAccept(a.Get());
- A_.push_back(a);
- }
- }
- ~TServer() override {
- EA_.SyncShutdown(); //cancel accepting connections
- A_.clear(); //stop listening
- EP_.SyncShutdown(); //close all exist connections
- }
- void StartAccept(TTcpAcceptor* a) {
- const auto s = MakeAtomicShared<TTcpSocket>(EP_.Size() ? EP_.GetExecutor().GetIOService() : EA_.GetIOService());
- a->AsyncAccept(*s, std::bind(&TServer::OnAccept, this, a, s, _1, _2));
- }
- void OnAccept(TTcpAcceptor* a, TTcpSocketRef s, const TErrorCode& ec, IHandlingContext&) {
- if (Y_UNLIKELY(ec)) {
- if (ec.Value() == ECANCELED) {
- return;
- } else if (ec.Value() == EMFILE || ec.Value() == ENFILE || ec.Value() == ENOMEM || ec.Value() == ENOBUFS) {
- //reach some os limit, suspend accepting for preventing busyloop (100% cpu usage)
- TSimpleSharedPtr<TDeadlineTimer> dt(new TDeadlineTimer(a->GetIOService()));
- dt->AsyncWaitExpireAt(TDuration::Seconds(30), std::bind(&TServer::OnTimeoutSuspendAccept, this, a, dt, _1, _2));
- } else {
- Cdbg << "acc: " << ec.Text() << Endl;
- }
- } else {
- SetNonBlock(s->Native());
- PrepareSocket(s->Native());
- TConnection::Create(*this, s);
- }
- StartAccept(a); //continue accepting
- }
- void OnTimeoutSuspendAccept(TTcpAcceptor* a, TSimpleSharedPtr<TDeadlineTimer>, const TErrorCode& ec, IHandlingContext&) {
- if (!ec) {
- DBGOUT("resume acceptor");
- StartAccept(a);
- }
- }
- void OnRequest(IRequestRef& r) {
- try {
- CB_->OnRequest(r);
- } catch (...) {
- Cdbg << CurrentExceptionMessage() << Endl;
- }
- }
- private:
- TVector<TTcpAcceptorPtr> A_;
- TIOServiceExecutor EA_; //thread, where accepted incoming tcp connections
- TExecutorsPool EP_; //threads, for process write/read data to/from tcp connections (if empty, use EA_ for r/w)
- IOnRequest* CB_;
- };
- TServer::TRequest::TRequest(const TConnectionRef& conn, TBuffer& buf, const TString& content)
- : Conn(conn)
- , Content_(content)
- , Replied_(0)
- , State(new TState())
- {
- DBGOUT("TServer::TRequest()");
- Buf.Swap(buf);
- }
- TServer::TRequest::~TRequest() {
- DBGOUT("TServer::~TRequest()");
- if (!AtomicGet(Replied_)) {
- Conn->SendError(RequestHeader().Id, TResponseHeader::EmptyReply);
- }
- Conn->OnFinishRequest(RequestHeader().Id);
- }
- TString TServer::TRequest::RemoteHost() const {
- return Conn->RemoteHost();
- }
- void TServer::TRequest::SendReply(TData& data) {
- do {
- if (AtomicCas(&Replied_, 1, 0)) {
- Conn->SendResponse(RequestHeader().Id, data);
- return;
- }
- } while (AtomicGet(Replied_) == 0);
- }
- class TProtocol: public IProtocol {
- public:
- inline TProtocol() {
- InitNetworkSubSystem();
- }
- IRequesterRef CreateRequester(IOnRequest* cb, const TParsedLocation& loc) override {
- return new TServer(cb, loc.GetPort());
- }
- THandleRef ScheduleRequest(const TMessage& msg, IOnRecv* fallback, TServiceStatRef& ss) override {
- return Singleton<TClient>()->Schedule(msg, fallback, ss);
- }
- TStringBuf Scheme() const noexcept override {
- return TStringBuf("tcp2");
- }
- bool SetOption(TStringBuf name, TStringBuf value) override {
- return TTcp2Options::Set(name, value);
- }
- };
- }
- }
- NNeh::IProtocol* NNeh::Tcp2Protocol() {
- return Singleton<NNehTcp2::TProtocol>();
- }
|