tcp2.cpp 63 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656
  1. #include "tcp2.h"
  2. #include "details.h"
  3. #include "factory.h"
  4. #include "http_common.h"
  5. #include "neh.h"
  6. #include "utils.h"
  7. #include <library/cpp/dns/cache.h>
  8. #include <library/cpp/neh/asio/executor.h>
  9. #include <library/cpp/threading/atomic/bool.h>
  10. #include <util/generic/buffer.h>
  11. #include <util/generic/hash.h>
  12. #include <util/generic/singleton.h>
  13. #include <util/network/endpoint.h>
  14. #include <util/network/init.h>
  15. #include <util/network/iovec.h>
  16. #include <util/network/socket.h>
  17. #include <util/string/cast.h>
  18. #include <atomic>
  19. //#define DEBUG_TCP2 1
  20. #ifdef DEBUG_TCP2
  21. TSpinLock OUT_LOCK;
  22. #define DBGOUT(args) \
  23. { \
  24. TGuard<TSpinLock> m(OUT_LOCK); \
  25. Cout << TInstant::Now().GetValue() << " " << args << Endl; \
  26. }
  27. #else
  28. #define DBGOUT(args)
  29. #endif
  30. using namespace std::placeholders;
  31. namespace NNeh {
  32. TDuration TTcp2Options::ConnectTimeout = TDuration::MilliSeconds(300);
  33. size_t TTcp2Options::InputBufferSize = 16000;
  34. size_t TTcp2Options::AsioClientThreads = 4;
  35. size_t TTcp2Options::AsioServerThreads = 4;
  36. int TTcp2Options::Backlog = 100;
  37. bool TTcp2Options::ClientUseDirectWrite = true;
  38. bool TTcp2Options::ServerUseDirectWrite = true;
  39. TDuration TTcp2Options::ServerInputDeadline = TDuration::Seconds(3600);
  40. TDuration TTcp2Options::ServerOutputDeadline = TDuration::Seconds(10);
  41. bool TTcp2Options::Set(TStringBuf name, TStringBuf value) {
  42. #define TCP2_TRY_SET(optType, optName) \
  43. if (name == TStringBuf(#optName)) { \
  44. optName = FromString<optType>(value); \
  45. }
  46. TCP2_TRY_SET(TDuration, ConnectTimeout)
  47. else TCP2_TRY_SET(size_t, InputBufferSize) else TCP2_TRY_SET(size_t, AsioClientThreads) else TCP2_TRY_SET(size_t, AsioServerThreads) else TCP2_TRY_SET(int, Backlog) else TCP2_TRY_SET(bool, ClientUseDirectWrite) else TCP2_TRY_SET(bool, ServerUseDirectWrite) else TCP2_TRY_SET(TDuration, ServerInputDeadline) else TCP2_TRY_SET(TDuration, ServerOutputDeadline) else {
  48. return false;
  49. }
  50. return true;
  51. }
  52. }
  53. namespace {
  54. namespace NNehTcp2 {
  55. using namespace NAsio;
  56. using namespace NDns;
  57. using namespace NNeh;
  58. const TString canceled = "canceled";
  59. const TString emptyReply = "empty reply";
  60. inline void PrepareSocket(SOCKET s) {
  61. SetNoDelay(s, true);
  62. }
  63. typedef ui64 TRequestId;
  64. #pragma pack(push, 1) //disable align struct members (structs mapped to data transmitted other network)
  65. struct TBaseHeader {
  66. enum TMessageType {
  67. Request = 1,
  68. Response = 2,
  69. Cancel = 3,
  70. MaxMessageType
  71. };
  72. TBaseHeader(TRequestId id, ui32 headerLength, ui8 version, ui8 mType)
  73. : Id(id)
  74. , HeaderLength(headerLength)
  75. , Version(version)
  76. , Type(mType)
  77. {
  78. }
  79. TRequestId Id; //message id, - monotonic inc. sequence (skip nil value)
  80. ui32 HeaderLength;
  81. ui8 Version; //current version: 1
  82. ui8 Type; //<- TMessageType (+ in future possible ForceResponse,etc)
  83. };
  84. struct TRequestHeader: public TBaseHeader {
  85. TRequestHeader(TRequestId reqId, size_t servicePathLength, size_t dataSize)
  86. : TBaseHeader(reqId, sizeof(TRequestHeader) + servicePathLength, 1, (ui8)Request)
  87. , ContentLength(dataSize)
  88. {
  89. }
  90. ui32 ContentLength;
  91. };
  92. struct TResponseHeader: public TBaseHeader {
  93. enum TErrorCode {
  94. Success = 0,
  95. EmptyReply = 1 //not found such service or service not sent response
  96. ,
  97. MaxErrorCode
  98. };
  99. TResponseHeader(TRequestId reqId, TErrorCode code, size_t dataSize)
  100. : TBaseHeader(reqId, sizeof(TResponseHeader), 1, (ui8)Response)
  101. , ErrorCode((ui16)code)
  102. , ContentLength(dataSize)
  103. {
  104. }
  105. TString ErrorDescription() const {
  106. if (ErrorCode == (ui16)EmptyReply) {
  107. return emptyReply;
  108. }
  109. TStringStream ss;
  110. ss << TStringBuf("tcp2 err_code=") << ErrorCode;
  111. return ss.Str();
  112. }
  113. ui16 ErrorCode;
  114. ui32 ContentLength;
  115. };
  116. struct TCancelHeader: public TBaseHeader {
  117. TCancelHeader(TRequestId reqId)
  118. : TBaseHeader(reqId, sizeof(TCancelHeader), 1, (ui8)Cancel)
  119. {
  120. }
  121. };
  122. #pragma pack(pop)
  123. static const size_t maxHeaderSize = sizeof(TResponseHeader);
  124. //buffer for read input data, - header + message data
  125. struct TTcp2Message {
  126. TTcp2Message()
  127. : Loader_(&TTcp2Message::LoadBaseHeader)
  128. , RequireBytesForComplete_(sizeof(TBaseHeader))
  129. , Header_(sizeof(TBaseHeader))
  130. {
  131. }
  132. void Clear() {
  133. Loader_ = &TTcp2Message::LoadBaseHeader;
  134. RequireBytesForComplete_ = sizeof(TBaseHeader);
  135. Header_.Clear();
  136. Content_.clear();
  137. }
  138. TBuffer& Header() noexcept {
  139. return Header_;
  140. }
  141. const TString& Content() const noexcept {
  142. return Content_;
  143. }
  144. bool IsComplete() const noexcept {
  145. return RequireBytesForComplete_ == 0;
  146. }
  147. size_t LoadFrom(const char* buf, size_t len) {
  148. return (this->*Loader_)(buf, len);
  149. }
  150. const TBaseHeader& BaseHeader() const {
  151. return *reinterpret_cast<const TBaseHeader*>(Header_.Data());
  152. }
  153. const TRequestHeader& RequestHeader() const {
  154. return *reinterpret_cast<const TRequestHeader*>(Header_.Data());
  155. }
  156. const TResponseHeader& ResponseHeader() const {
  157. return *reinterpret_cast<const TResponseHeader*>(Header_.Data());
  158. }
  159. private:
  160. size_t LoadBaseHeader(const char* buf, size_t len) {
  161. size_t useBytes = Min<size_t>(sizeof(TBaseHeader) - Header_.Size(), len);
  162. Header_.Append(buf, useBytes);
  163. if (Y_UNLIKELY(sizeof(TBaseHeader) > Header_.Size())) {
  164. //base header yet not complete
  165. return useBytes;
  166. }
  167. {
  168. const TBaseHeader& hdr = BaseHeader();
  169. if (BaseHeader().HeaderLength > 32000) { //some heuristic header size limit
  170. throw yexception() << TStringBuf("to large neh/tcp2 header size: ") << BaseHeader().HeaderLength;
  171. }
  172. //header completed
  173. Header_.Reserve(hdr.HeaderLength);
  174. }
  175. const TBaseHeader& hdr = BaseHeader(); //reallocation can move Header_ data to another place, so use fresh 'hdr'
  176. if (Y_UNLIKELY(hdr.Version != 1)) {
  177. throw yexception() << TStringBuf("unsupported protocol version: ") << static_cast<unsigned>(hdr.Version);
  178. }
  179. RequireBytesForComplete_ = hdr.HeaderLength - sizeof(TBaseHeader);
  180. return useBytes + LoadHeader(buf + useBytes, len - useBytes);
  181. }
  182. size_t LoadHeader(const char* buf, size_t len) {
  183. size_t useBytes = Min<size_t>(RequireBytesForComplete_, len);
  184. Header_.Append(buf, useBytes);
  185. RequireBytesForComplete_ -= useBytes;
  186. if (RequireBytesForComplete_) {
  187. //continue load header
  188. Loader_ = &TTcp2Message::LoadHeader;
  189. return useBytes;
  190. }
  191. const TBaseHeader& hdr = *reinterpret_cast<const TBaseHeader*>(Header_.Data());
  192. if (hdr.Type == TBaseHeader::Request) {
  193. if (Header_.Size() < sizeof(TRequestHeader)) {
  194. throw yexception() << TStringBuf("invalid request header size");
  195. }
  196. InitContentLoading(RequestHeader().ContentLength);
  197. } else if (hdr.Type == TBaseHeader::Response) {
  198. if (Header_.Size() < sizeof(TResponseHeader)) {
  199. throw yexception() << TStringBuf("invalid response header size");
  200. }
  201. InitContentLoading(ResponseHeader().ContentLength);
  202. } else if (hdr.Type == TBaseHeader::Cancel) {
  203. if (Header_.Size() < sizeof(TCancelHeader)) {
  204. throw yexception() << TStringBuf("invalid cancel header size");
  205. }
  206. return useBytes;
  207. } else {
  208. throw yexception() << TStringBuf("unsupported request type: ") << static_cast<unsigned>(hdr.Type);
  209. }
  210. return useBytes + (this->*Loader_)(buf + useBytes, len - useBytes);
  211. }
  212. void InitContentLoading(size_t contentLength) {
  213. RequireBytesForComplete_ = contentLength;
  214. Content_.ReserveAndResize(contentLength);
  215. Loader_ = &TTcp2Message::LoadContent;
  216. }
  217. size_t LoadContent(const char* buf, size_t len) {
  218. size_t curContentSize = Content_.size() - RequireBytesForComplete_;
  219. size_t useBytes = Min<size_t>(RequireBytesForComplete_, len);
  220. memcpy(Content_.begin() + curContentSize, buf, useBytes);
  221. RequireBytesForComplete_ -= useBytes;
  222. return useBytes;
  223. }
  224. private:
  225. typedef size_t (TTcp2Message::*TLoader)(const char*, size_t);
  226. TLoader Loader_; //current loader (stages - base-header/header/content)
  227. size_t RequireBytesForComplete_;
  228. TBuffer Header_;
  229. TString Content_;
  230. };
  231. //base storage for output data
  232. class TMultiBuffers {
  233. public:
  234. TMultiBuffers()
  235. : IOVec_(nullptr, 0)
  236. , DataSize_(0)
  237. , PoolBytes_(0)
  238. {
  239. }
  240. void Clear() noexcept {
  241. Parts_.clear();
  242. DataSize_ = 0;
  243. PoolBytes_ = 0;
  244. }
  245. bool HasFreeSpace() const noexcept {
  246. return DataSize_ < 64000 && (PoolBytes_ < (MemPoolSize_ - maxHeaderSize));
  247. }
  248. bool HasData() const noexcept {
  249. return Parts_.size();
  250. }
  251. TContIOVector* GetIOvec() noexcept {
  252. return &IOVec_;
  253. }
  254. protected:
  255. void AddPart(const void* buf, size_t len) {
  256. Parts_.push_back(IOutputStream::TPart(buf, len));
  257. DataSize_ += len;
  258. }
  259. //used for allocate header (MUST be POD type)
  260. template <typename T>
  261. inline T* Allocate() noexcept {
  262. size_t poolBytes = PoolBytes_;
  263. PoolBytes_ += sizeof(T);
  264. return (T*)(MemPool_ + poolBytes);
  265. }
  266. //used for allocate header (MUST be POD type) + some tail
  267. template <typename T>
  268. inline T* AllocatePlus(size_t tailSize) noexcept {
  269. Y_ASSERT(tailSize <= MemPoolReserve_);
  270. size_t poolBytes = PoolBytes_;
  271. PoolBytes_ += sizeof(T) + tailSize;
  272. return (T*)(MemPool_ + poolBytes);
  273. }
  274. protected:
  275. TContIOVector IOVec_;
  276. TVector<IOutputStream::TPart> Parts_;
  277. static const size_t MemPoolSize_ = maxHeaderSize * 100;
  278. static const size_t MemPoolReserve_ = 32;
  279. size_t DataSize_;
  280. size_t PoolBytes_;
  281. char MemPool_[MemPoolSize_ + MemPoolReserve_];
  282. };
  283. //protector for limit usage tcp connection output (and used data) only from one thread at same time
  284. class TOutputLock {
  285. public:
  286. TOutputLock() noexcept
  287. : Lock_(0)
  288. {
  289. }
  290. bool TryAquire() noexcept {
  291. do {
  292. if (AtomicTryLock(&Lock_)) {
  293. return true;
  294. }
  295. } while (!AtomicGet(Lock_)); //without magic loop atomic lock some unreliable
  296. return false;
  297. }
  298. void Release() noexcept {
  299. AtomicUnlock(&Lock_);
  300. }
  301. bool IsFree() const noexcept {
  302. return !AtomicGet(Lock_);
  303. }
  304. private:
  305. TAtomic Lock_;
  306. };
  307. class TClient {
  308. class TRequest;
  309. class TConnection;
  310. typedef TIntrusivePtr<TRequest> TRequestRef;
  311. typedef TIntrusivePtr<TConnection> TConnectionRef;
  312. class TRequest: public TThrRefBase, public TNonCopyable {
  313. public:
  314. class THandle: public TSimpleHandle {
  315. public:
  316. THandle(IOnRecv* f, const TMessage& msg, TStatCollector* s) noexcept
  317. : TSimpleHandle(f, msg, s)
  318. {
  319. }
  320. bool MessageSendedCompletely() const noexcept override {
  321. if (TSimpleHandle::MessageSendedCompletely()) {
  322. return true;
  323. }
  324. TRequestRef req = GetRequest();
  325. if (!!req && req->RequestSendedCompletely()) {
  326. const_cast<THandle*>(this)->SetSendComplete();
  327. }
  328. return TSimpleHandle::MessageSendedCompletely();
  329. }
  330. void Cancel() noexcept override {
  331. if (TSimpleHandle::Canceled()) {
  332. return;
  333. }
  334. TRequestRef req = GetRequest();
  335. if (!!req) {
  336. req->Cancel();
  337. TSimpleHandle::Cancel();
  338. }
  339. }
  340. void NotifyResponse(const TString& resp) {
  341. TNotifyHandle::NotifyResponse(resp);
  342. ReleaseRequest();
  343. }
  344. void NotifyError(const TString& error) {
  345. TNotifyHandle::NotifyError(error);
  346. ReleaseRequest();
  347. }
  348. void NotifyError(TErrorRef error) {
  349. TNotifyHandle::NotifyError(error);
  350. ReleaseRequest();
  351. }
  352. //not thread safe!
  353. void SetRequest(const TRequestRef& r) noexcept {
  354. Req_ = r;
  355. }
  356. void ReleaseRequest() noexcept {
  357. TRequestRef tmp;
  358. TGuard<TSpinLock> g(SP_);
  359. tmp.Swap(Req_);
  360. }
  361. private:
  362. TRequestRef GetRequest() const noexcept {
  363. TGuard<TSpinLock> g(SP_);
  364. return Req_;
  365. }
  366. mutable TSpinLock SP_;
  367. TRequestRef Req_;
  368. };
  369. typedef TIntrusivePtr<THandle> THandleRef;
  370. static void Run(THandleRef& h, const TMessage& msg, TClient& clnt) {
  371. TRequestRef req(new TRequest(h, msg, clnt));
  372. h->SetRequest(req);
  373. req->Run(req);
  374. }
  375. ~TRequest() override {
  376. DBGOUT("TClient::~TRequest()");
  377. }
  378. private:
  379. TRequest(THandleRef& h, TMessage msg, TClient& clnt)
  380. : Hndl_(h)
  381. , Clnt_(clnt)
  382. , Msg_(std::move(msg))
  383. , Loc_(Msg_.Addr)
  384. , Addr_(CachedResolve(TResolveInfo(Loc_.Host, Loc_.GetPort())))
  385. , Canceled_(false)
  386. , Id_(0)
  387. {
  388. DBGOUT("TClient::TRequest()");
  389. }
  390. void Run(TRequestRef& req) {
  391. TDestination& dest = Clnt_.Dest_.Get(Addr_->Id);
  392. dest.Run(req);
  393. }
  394. public:
  395. void OnResponse(TTcp2Message& msg) {
  396. DBGOUT("TRequest::OnResponse: " << msg.ResponseHeader().Id);
  397. THandleRef h = ReleaseHandler();
  398. if (!h) {
  399. return;
  400. }
  401. const TResponseHeader& respHdr = msg.ResponseHeader();
  402. if (Y_LIKELY(!respHdr.ErrorCode)) {
  403. h->NotifyResponse(msg.Content());
  404. } else {
  405. h->NotifyError(new TError(respHdr.ErrorDescription(), TError::ProtocolSpecific, respHdr.ErrorCode));
  406. }
  407. ReleaseConn();
  408. }
  409. void OnError(const TString& err, const i32 systemCode = 0) {
  410. DBGOUT("TRequest::OnError: " << Id_.load(std::memory_order_acquire));
  411. THandleRef h = ReleaseHandler();
  412. if (!h) {
  413. return;
  414. }
  415. h->NotifyError(new TError(err, TError::UnknownType, 0, systemCode));
  416. ReleaseConn();
  417. }
  418. void SetConnection(TConnection* conn) noexcept {
  419. auto g = Guard(AL_);
  420. Conn_ = conn;
  421. }
  422. bool Canceled() const noexcept {
  423. return Canceled_;
  424. }
  425. const TResolvedHost* Addr() const noexcept {
  426. return Addr_;
  427. }
  428. TStringBuf Service() const noexcept {
  429. return Loc_.Service;
  430. }
  431. const TString& Data() const noexcept {
  432. return Msg_.Data;
  433. }
  434. TClient& Client() noexcept {
  435. return Clnt_;
  436. }
  437. bool RequestSendedCompletely() const noexcept {
  438. if (Id_.load(std::memory_order_acquire) == 0) {
  439. return false;
  440. }
  441. TConnectionRef conn = GetConn();
  442. if (!conn) {
  443. return false;
  444. }
  445. TRequestId lastSendedReqId = conn->LastSendedRequestId();
  446. if (lastSendedReqId >= Id_.load(std::memory_order_acquire)) {
  447. return true;
  448. } else if (Y_UNLIKELY((Id_.load(std::memory_order_acquire) - lastSendedReqId) > (Max<TRequestId>() - Max<ui32>()))) {
  449. //overflow req-id value
  450. return true;
  451. }
  452. return false;
  453. }
  454. void Cancel() noexcept {
  455. Canceled_ = true;
  456. THandleRef h = ReleaseHandler();
  457. if (!h) {
  458. return;
  459. }
  460. TConnectionRef conn = ReleaseConn();
  461. if (!!conn && Id_.load(std::memory_order_acquire)) {
  462. conn->Cancel(Id_.load(std::memory_order_acquire));
  463. }
  464. h->NotifyError(new TError(canceled, TError::Cancelled));
  465. }
  466. void SetReqId(TRequestId reqId) noexcept {
  467. auto guard = Guard(IdLock_);
  468. Id_.store(reqId, std::memory_order_release);
  469. }
  470. TRequestId ReqId() const noexcept {
  471. return Id_.load(std::memory_order_acquire);
  472. }
  473. private:
  474. inline THandleRef ReleaseHandler() noexcept {
  475. THandleRef h;
  476. {
  477. auto g = Guard(AL_);
  478. h.Swap(Hndl_);
  479. }
  480. return h;
  481. }
  482. inline TConnectionRef GetConn() const noexcept {
  483. auto g = Guard(AL_);
  484. return Conn_;
  485. }
  486. inline TConnectionRef ReleaseConn() noexcept {
  487. TConnectionRef c;
  488. {
  489. auto g = Guard(AL_);
  490. c.Swap(Conn_);
  491. }
  492. return c;
  493. }
  494. mutable TAdaptiveLock AL_; //guaranted calling notify() only once (prevent race between asio thread and current)
  495. THandleRef Hndl_;
  496. TClient& Clnt_;
  497. const TMessage Msg_;
  498. const TParsedLocation Loc_;
  499. const TResolvedHost* Addr_;
  500. TConnectionRef Conn_;
  501. NAtomic::TBool Canceled_;
  502. TSpinLock IdLock_;
  503. std::atomic<TRequestId> Id_;
  504. };
  505. class TConnection: public TThrRefBase {
  506. enum TState {
  507. Init,
  508. Connecting,
  509. Connected,
  510. Closed,
  511. MaxState
  512. };
  513. typedef THashMap<TRequestId, TRequestRef> TReqsInFly;
  514. public:
  515. class TOutputBuffers: public TMultiBuffers {
  516. public:
  517. void AddRequest(const TRequestRef& req) {
  518. Requests_.push_back(req);
  519. if (req->Service().size() > MemPoolReserve_) {
  520. TRequestHeader* hdr = new (Allocate<TRequestHeader>()) TRequestHeader(req->ReqId(), req->Service().size(), req->Data().size());
  521. AddPart(hdr, sizeof(TRequestHeader));
  522. AddPart(req->Service().data(), req->Service().size());
  523. } else {
  524. TRequestHeader* hdr = new (AllocatePlus<TRequestHeader>(req->Service().size())) TRequestHeader(req->ReqId(), req->Service().size(), req->Data().size());
  525. AddPart(hdr, sizeof(TRequestHeader) + req->Service().size());
  526. memmove(++hdr, req->Service().data(), req->Service().size());
  527. }
  528. AddPart(req->Data().data(), req->Data().size());
  529. IOVec_ = TContIOVector(Parts_.data(), Parts_.size());
  530. }
  531. void AddCancelRequest(TRequestId reqId) {
  532. TCancelHeader* hdr = new (Allocate<TCancelHeader>()) TCancelHeader(reqId);
  533. AddPart(hdr, sizeof(TCancelHeader));
  534. IOVec_ = TContIOVector(Parts_.data(), Parts_.size());
  535. }
  536. void Clear() {
  537. TMultiBuffers::Clear();
  538. Requests_.clear();
  539. }
  540. private:
  541. TVector<TRequestRef> Requests_;
  542. };
  543. TConnection(TIOService& srv)
  544. : AS_(srv)
  545. , State_(Init)
  546. , BuffSize_(TTcp2Options::InputBufferSize)
  547. , Buff_(new char[BuffSize_])
  548. , NeedCheckReqsQueue_(0)
  549. , NeedCheckCancelsQueue_(0)
  550. , GenReqId_(0)
  551. , LastSendedReqId_(0)
  552. {
  553. }
  554. ~TConnection() override {
  555. try {
  556. DBGOUT("TClient::~TConnection()");
  557. OnError("~");
  558. } catch (...) {
  559. Cdbg << "tcp2::~cln_conn: " << CurrentExceptionMessage() << Endl;
  560. }
  561. }
  562. //called from client thread
  563. bool Run(TRequestRef& req) {
  564. if (Y_UNLIKELY(AtomicGet(State_) == Closed)) {
  565. return false;
  566. }
  567. req->Ref();
  568. try {
  569. Reqs_.Enqueue(req.Get());
  570. } catch (...) {
  571. req->UnRef();
  572. throw;
  573. }
  574. AtomicSet(NeedCheckReqsQueue_, 1);
  575. req->SetConnection(this);
  576. TAtomicBase state = AtomicGet(State_);
  577. if (Y_LIKELY(state == Connected)) {
  578. ProcessOutputReqsQueue();
  579. return true;
  580. }
  581. if (state == Init) {
  582. if (AtomicCas(&State_, Connecting, Init)) {
  583. try {
  584. TEndpoint addr(new NAddr::TAddrInfo(&*req->Addr()->Addr.Begin()));
  585. AS_.AsyncConnect(addr, std::bind(&TConnection::OnConnect, TConnectionRef(this), _1, _2), TTcp2Options::ConnectTimeout);
  586. } catch (...) {
  587. AS_.GetIOService().Post(std::bind(&TConnection::OnErrorCallback, TConnectionRef(this), CurrentExceptionMessage()));
  588. }
  589. return true;
  590. }
  591. }
  592. state = AtomicGet(State_);
  593. if (state == Connected) {
  594. ProcessOutputReqsQueue();
  595. } else if (state == Closed) {
  596. SafeOnError();
  597. }
  598. return true;
  599. }
  600. //called from client thread
  601. void Cancel(TRequestId id) {
  602. Cancels_.Enqueue(id);
  603. AtomicSet(NeedCheckCancelsQueue_, 1);
  604. if (Y_LIKELY(AtomicGet(State_) == Connected)) {
  605. ProcessOutputCancelsQueue();
  606. }
  607. }
  608. void ProcessOutputReqsQueue() {
  609. if (OutputLock_.TryAquire()) {
  610. SendMessages(false);
  611. }
  612. }
  613. void ProcessOutputCancelsQueue() {
  614. if (OutputLock_.TryAquire()) {
  615. AS_.GetIOService().Post(std::bind(&TConnection::SendMessages, TConnectionRef(this), true));
  616. return;
  617. }
  618. }
  619. //must be called only from asio thread
  620. void ProcessReqsInFlyQueue() {
  621. if (AtomicGet(State_) == Closed) {
  622. return;
  623. }
  624. TRequest* reqPtr;
  625. while (ReqsInFlyQueue_.Dequeue(&reqPtr)) {
  626. TRequestRef reqTmp(reqPtr);
  627. reqPtr->UnRef();
  628. ReqsInFly_[reqPtr->ReqId()].Swap(reqTmp);
  629. }
  630. }
  631. //must be called only from asio thread
  632. void OnConnect(const TErrorCode& ec, IHandlingContext&) {
  633. DBGOUT("TConnect::OnConnect: " << ec.Value());
  634. if (Y_UNLIKELY(ec)) {
  635. if (ec.Value() == EIO) {
  636. //try get more detail error info
  637. char buf[1];
  638. TErrorCode errConnect;
  639. AS_.ReadSome(buf, 1, errConnect);
  640. OnErrorCode(errConnect.Value() ? errConnect : ec);
  641. } else {
  642. OnErrorCode(ec);
  643. }
  644. } else {
  645. try {
  646. PrepareSocket(AS_.Native());
  647. AtomicSet(State_, Connected);
  648. AS_.AsyncPollRead(std::bind(&TConnection::OnCanRead, TConnectionRef(this), _1, _2));
  649. if (OutputLock_.TryAquire()) {
  650. SendMessages(true);
  651. return;
  652. }
  653. } catch (...) {
  654. OnError(CurrentExceptionMessage());
  655. }
  656. }
  657. }
  658. //must be called only after succes aquiring output
  659. void SendMessages(bool asioThread) {
  660. //DBGOUT("SendMessages");
  661. if (Y_UNLIKELY(AtomicGet(State_) == Closed)) {
  662. if (asioThread) {
  663. OnError(Error_);
  664. } else {
  665. SafeOnError();
  666. }
  667. return;
  668. }
  669. do {
  670. if (asioThread) {
  671. AtomicSet(NeedCheckCancelsQueue_, 0);
  672. TRequestId reqId;
  673. ProcessReqsInFlyQueue();
  674. while (Cancels_.Dequeue(&reqId)) {
  675. TReqsInFly::iterator it = ReqsInFly_.find(reqId);
  676. if (it == ReqsInFly_.end()) {
  677. continue;
  678. }
  679. ReqsInFly_.erase(it);
  680. OutputBuffers_.AddCancelRequest(reqId);
  681. if (Y_UNLIKELY(!OutputBuffers_.HasFreeSpace())) {
  682. if (!FlushOutputBuffers(asioThread, 0)) {
  683. return;
  684. }
  685. }
  686. }
  687. } else if (AtomicGet(NeedCheckCancelsQueue_)) {
  688. AS_.GetIOService().Post(std::bind(&TConnection::SendMessages, TConnectionRef(this), true));
  689. return;
  690. }
  691. TRequestId lastReqId = 0;
  692. {
  693. AtomicSet(NeedCheckReqsQueue_, 0);
  694. TRequest* reqPtr;
  695. while (Reqs_.Dequeue(&reqPtr)) {
  696. TRequestRef reqTmp(reqPtr);
  697. reqPtr->UnRef();
  698. reqPtr->SetReqId(GenerateReqId());
  699. if (reqPtr->Canceled()) {
  700. continue;
  701. }
  702. lastReqId = reqPtr->ReqId();
  703. if (asioThread) {
  704. TRequestRef& req = ReqsInFly_[(TRequestId)reqPtr->ReqId()];
  705. req.Swap(reqTmp);
  706. OutputBuffers_.AddRequest(req);
  707. } else { //can access to ReqsInFly_ only from asio thread, so enqueue req to update ReqsInFly_ queue
  708. try {
  709. reqTmp->Ref();
  710. ReqsInFlyQueue_.Enqueue(reqPtr);
  711. } catch (...) {
  712. reqTmp->UnRef();
  713. throw;
  714. }
  715. OutputBuffers_.AddRequest(reqTmp);
  716. }
  717. if (Y_UNLIKELY(!OutputBuffers_.HasFreeSpace())) {
  718. if (!FlushOutputBuffers(asioThread, lastReqId)) {
  719. return;
  720. }
  721. }
  722. }
  723. }
  724. if (OutputBuffers_.HasData()) {
  725. if (!FlushOutputBuffers(asioThread, lastReqId)) {
  726. return;
  727. }
  728. }
  729. OutputLock_.Release();
  730. if (!AtomicGet(NeedCheckReqsQueue_) && !AtomicGet(NeedCheckCancelsQueue_)) {
  731. DBGOUT("TClient::SendMessages(exit2)");
  732. return;
  733. }
  734. } while (OutputLock_.TryAquire());
  735. DBGOUT("TClient::SendMessages(exit1)");
  736. }
  737. TRequestId GenerateReqId() noexcept {
  738. TRequestId reqId;
  739. {
  740. auto guard = Guard(GenReqIdLock_);
  741. reqId = ++GenReqId_;
  742. }
  743. return Y_LIKELY(reqId) ? reqId : GenerateReqId();
  744. }
  745. //called non thread-safe (from outside thread)
  746. bool FlushOutputBuffers(bool asioThread, TRequestId reqId) {
  747. if (asioThread || TTcp2Options::ClientUseDirectWrite) {
  748. TContIOVector& vec = *OutputBuffers_.GetIOvec();
  749. TErrorCode err;
  750. vec.Proceed(AS_.WriteSome(vec, err));
  751. if (Y_UNLIKELY(err)) {
  752. if (asioThread) {
  753. OnErrorCode(err);
  754. } else {
  755. AS_.GetIOService().Post(std::bind(&TConnection::OnErrorCode, TConnectionRef(this), err));
  756. }
  757. return false;
  758. }
  759. if (vec.Complete()) {
  760. LastSendedReqId_.store(reqId, std::memory_order_release);
  761. DBGOUT("Client::FlushOutputBuffers(" << reqId << ")");
  762. OutputBuffers_.Clear();
  763. return true;
  764. }
  765. }
  766. DBGOUT("Client::AsyncWrite(" << reqId << ")");
  767. AS_.AsyncWrite(OutputBuffers_.GetIOvec(), std::bind(&TConnection::OnSend, TConnectionRef(this), reqId, _1, _2, _3), TTcp2Options::ServerOutputDeadline);
  768. return false;
  769. }
  770. //must be called only from asio thread
  771. void OnSend(TRequestId reqId, const TErrorCode& ec, size_t amount, IHandlingContext&) {
  772. Y_UNUSED(amount);
  773. if (Y_UNLIKELY(ec)) {
  774. OnErrorCode(ec);
  775. } else {
  776. if (Y_LIKELY(reqId)) {
  777. DBGOUT("Client::OnSend(" << reqId << ")");
  778. LastSendedReqId_.store(reqId, std::memory_order_release);
  779. }
  780. //output already aquired, used asio thread
  781. OutputBuffers_.Clear();
  782. SendMessages(true);
  783. }
  784. }
  785. //must be called only from asio thread
  786. void OnCanRead(const TErrorCode& ec, IHandlingContext& ctx) {
  787. //DBGOUT("OnCanRead(" << ec.Value() << ")");
  788. if (Y_UNLIKELY(ec)) {
  789. OnErrorCode(ec);
  790. } else {
  791. TErrorCode ec2;
  792. OnReadSome(ec2, AS_.ReadSome(Buff_.Get(), BuffSize_, ec2), ctx);
  793. }
  794. }
  795. //must be called only from asio thread
  796. void OnReadSome(const TErrorCode& ec, size_t amount, IHandlingContext& ctx) {
  797. //DBGOUT("OnReadSome(" << ec.Value() << ", " << amount << ")");
  798. if (Y_UNLIKELY(ec)) {
  799. OnErrorCode(ec);
  800. return;
  801. }
  802. while (1) {
  803. if (Y_UNLIKELY(!amount)) {
  804. OnError("tcp conn. closed");
  805. return;
  806. }
  807. try {
  808. const char* buff = Buff_.Get();
  809. size_t leftBytes = amount;
  810. do {
  811. size_t useBytes = Msg_.LoadFrom(buff, leftBytes);
  812. leftBytes -= useBytes;
  813. buff += useBytes;
  814. if (Msg_.IsComplete()) {
  815. //DBGOUT("OnReceiveMessage(" << Msg_.BaseHeader().Id << "): " << leftBytes);
  816. OnReceiveMessage();
  817. Msg_.Clear();
  818. }
  819. } while (leftBytes);
  820. if (amount == BuffSize_) {
  821. //try decrease system calls, - re-run ReadSome if has full filled buffer
  822. TErrorCode ecR;
  823. amount = AS_.ReadSome(Buff_.Get(), BuffSize_, ecR);
  824. if (!ecR) {
  825. continue; //process next input data
  826. }
  827. if (ecR.Value() == EAGAIN || ecR.Value() == EWOULDBLOCK) {
  828. ctx.ContinueUseHandler();
  829. } else {
  830. OnErrorCode(ec);
  831. }
  832. } else {
  833. ctx.ContinueUseHandler();
  834. }
  835. } catch (...) {
  836. OnError(CurrentExceptionMessage());
  837. }
  838. return;
  839. }
  840. }
  841. //must be called only from asio thread
  842. void OnErrorCode(TErrorCode ec) {
  843. OnError(ec.Text(), ec.Value());
  844. }
  845. //must be called only from asio thread
  846. void OnErrorCallback(TString err) {
  847. OnError(err);
  848. }
  849. //must be called only from asio thread
  850. void OnError(const TString& err, const i32 systemCode = 0) {
  851. if (AtomicGet(State_) != Closed) {
  852. Error_ = err;
  853. SystemCode_ = systemCode;
  854. AtomicSet(State_, Closed);
  855. AS_.AsyncCancel();
  856. }
  857. SafeOnError();
  858. for (auto& it : ReqsInFly_) {
  859. it.second->OnError(err);
  860. }
  861. ReqsInFly_.clear();
  862. }
  863. void SafeOnError() {
  864. TRequest* reqPtr;
  865. while (Reqs_.Dequeue(&reqPtr)) {
  866. TRequestRef req(reqPtr);
  867. reqPtr->UnRef();
  868. //DBGOUT("err queue(" << AS_.Native() << "):" << size_t(reqPtr));
  869. req->OnError(Error_, SystemCode_);
  870. }
  871. while (ReqsInFlyQueue_.Dequeue(&reqPtr)) {
  872. TRequestRef req(reqPtr);
  873. reqPtr->UnRef();
  874. //DBGOUT("err fly queue(" << AS_.Native() << "):" << size_t(reqPtr));
  875. req->OnError(Error_, SystemCode_);
  876. }
  877. }
  878. //must be called only from asio thread
  879. void OnReceiveMessage() {
  880. //DBGOUT("OnReceiveMessage");
  881. const TBaseHeader& hdr = Msg_.BaseHeader();
  882. if (hdr.Type == TBaseHeader::Response) {
  883. ProcessReqsInFlyQueue();
  884. TReqsInFly::iterator it = ReqsInFly_.find(hdr.Id);
  885. if (it == ReqsInFly_.end()) {
  886. DBGOUT("ignore response: " << hdr.Id);
  887. return;
  888. }
  889. it->second->OnResponse(Msg_);
  890. ReqsInFly_.erase(it);
  891. } else {
  892. throw yexception() << TStringBuf("unsupported message type: ") << hdr.Type;
  893. }
  894. }
  895. TRequestId LastSendedRequestId() const noexcept {
  896. return LastSendedReqId_.load(std::memory_order_acquire);
  897. }
  898. private:
  899. NAsio::TTcpSocket AS_;
  900. TAtomic State_; //state machine status (TState)
  901. TString Error_;
  902. i32 SystemCode_ = 0;
  903. //input
  904. size_t BuffSize_;
  905. TArrayHolder<char> Buff_;
  906. TTcp2Message Msg_;
  907. //output
  908. TOutputLock OutputLock_;
  909. TAtomic NeedCheckReqsQueue_;
  910. TLockFreeQueue<TRequest*> Reqs_;
  911. TAtomic NeedCheckCancelsQueue_;
  912. TLockFreeQueue<TRequestId> Cancels_;
  913. TAdaptiveLock GenReqIdLock_;
  914. std::atomic<TRequestId> GenReqId_;
  915. std::atomic<TRequestId> LastSendedReqId_;
  916. TLockFreeQueue<TRequest*> ReqsInFlyQueue_;
  917. TReqsInFly ReqsInFly_;
  918. TOutputBuffers OutputBuffers_;
  919. };
  920. class TDestination {
  921. public:
  922. void Run(TRequestRef& req) {
  923. while (1) {
  924. TConnectionRef conn = GetConnection();
  925. if (!!conn && conn->Run(req)) {
  926. return;
  927. }
  928. DBGOUT("TDestination CreateConnection");
  929. CreateConnection(conn, req->Client().ExecutorsPool().GetExecutor().GetIOService());
  930. }
  931. }
  932. private:
  933. TConnectionRef GetConnection() {
  934. TGuard<TSpinLock> g(L_);
  935. return Conn_;
  936. }
  937. void CreateConnection(TConnectionRef& oldConn, TIOService& srv) {
  938. TConnectionRef conn(new TConnection(srv));
  939. TGuard<TSpinLock> g(L_);
  940. if (Conn_ == oldConn) {
  941. Conn_.Swap(conn);
  942. }
  943. }
  944. TSpinLock L_;
  945. TConnectionRef Conn_;
  946. };
  947. //////////// TClient /////////
  948. public:
  949. TClient()
  950. : EP_(TTcp2Options::AsioClientThreads)
  951. {
  952. }
  953. ~TClient() {
  954. EP_.SyncShutdown();
  955. }
  956. THandleRef Schedule(const TMessage& msg, IOnRecv* fallback, TServiceStatRef& ss) {
  957. //find exist connection or create new
  958. TRequest::THandleRef hndl(new TRequest::THandle(fallback, msg, !ss ? nullptr : new TStatCollector(ss)));
  959. try {
  960. TRequest::Run(hndl, msg, *this);
  961. } catch (...) {
  962. hndl->ResetOnRecv();
  963. hndl->ReleaseRequest();
  964. throw;
  965. }
  966. return hndl.Get();
  967. }
  968. TExecutorsPool& ExecutorsPool() {
  969. return EP_;
  970. }
  971. private:
  972. NNeh::NHttp::TLockFreeSequence<TDestination> Dest_;
  973. TExecutorsPool EP_;
  974. };
  975. ////////// server side ////////////////////////////////////////////////////////////////////////////////////////////
  976. class TServer: public IRequester {
  977. typedef TAutoPtr<TTcpAcceptor> TTcpAcceptorPtr;
  978. typedef TAtomicSharedPtr<TTcpSocket> TTcpSocketRef;
  979. class TConnection;
  980. typedef TIntrusivePtr<TConnection> TConnectionRef;
  981. struct TRequest: public IRequest {
  982. struct TState: public TThrRefBase {
  983. TState()
  984. : Canceled(false)
  985. {
  986. }
  987. TAtomicBool Canceled;
  988. };
  989. typedef TIntrusivePtr<TState> TStateRef;
  990. TRequest(const TConnectionRef& conn, TBuffer& buf, const TString& content);
  991. ~TRequest() override;
  992. TStringBuf Scheme() const override {
  993. return TStringBuf("tcp2");
  994. }
  995. TString RemoteHost() const override;
  996. TStringBuf Service() const override {
  997. return TStringBuf(Buf.Data() + sizeof(TRequestHeader), Buf.End());
  998. }
  999. TStringBuf Data() const override {
  1000. return TStringBuf(Content_);
  1001. }
  1002. TStringBuf RequestId() const override {
  1003. return TStringBuf();
  1004. }
  1005. bool Canceled() const override {
  1006. return State->Canceled;
  1007. }
  1008. void SendReply(TData& data) override;
  1009. void SendError(TResponseError, const TString&) override {
  1010. // TODO
  1011. }
  1012. const TRequestHeader& RequestHeader() const noexcept {
  1013. return *reinterpret_cast<const TRequestHeader*>(Buf.Data());
  1014. }
  1015. private:
  1016. TConnectionRef Conn;
  1017. TBuffer Buf; //service-name + message-data
  1018. TString Content_;
  1019. TAtomic Replied_;
  1020. public:
  1021. TIntrusivePtr<TState> State;
  1022. };
  1023. class TConnection: public TThrRefBase {
  1024. private:
  1025. TConnection(TServer& srv, const TTcpSocketRef& sock)
  1026. : Srv_(srv)
  1027. , AS_(sock)
  1028. , Canceled_(false)
  1029. , RemoteHost_(NNeh::PrintHostByRfc(*AS_->RemoteEndpoint().Addr()))
  1030. , BuffSize_(TTcp2Options::InputBufferSize)
  1031. , Buff_(new char[BuffSize_])
  1032. , NeedCheckOutputQueue_(0)
  1033. {
  1034. DBGOUT("TServer::TConnection()");
  1035. }
  1036. public:
  1037. class TOutputBuffers: public TMultiBuffers {
  1038. public:
  1039. void AddResponse(TRequestId reqId, TData& data) {
  1040. TResponseHeader* hdr = new (Allocate<TResponseHeader>()) TResponseHeader(reqId, TResponseHeader::Success, data.size());
  1041. ResponseData_.push_back(TAutoPtr<TData>(new TData()));
  1042. TData& movedData = *ResponseData_.back();
  1043. movedData.swap(data);
  1044. AddPart(hdr, sizeof(TResponseHeader));
  1045. AddPart(movedData.data(), movedData.size());
  1046. IOVec_ = TContIOVector(Parts_.data(), Parts_.size());
  1047. }
  1048. void AddError(TRequestId reqId, TResponseHeader::TErrorCode errCode) {
  1049. TResponseHeader* hdr = new (Allocate<TResponseHeader>()) TResponseHeader(reqId, errCode, 0);
  1050. AddPart(hdr, sizeof(TResponseHeader));
  1051. IOVec_ = TContIOVector(Parts_.data(), Parts_.size());
  1052. }
  1053. void Clear() {
  1054. TMultiBuffers::Clear();
  1055. ResponseData_.clear();
  1056. }
  1057. private:
  1058. TVector<TAutoPtr<TData>> ResponseData_;
  1059. };
  1060. static void Create(TServer& srv, const TTcpSocketRef& sock) {
  1061. TConnectionRef conn(new TConnection(srv, sock));
  1062. conn->AS_->AsyncPollRead(std::bind(&TConnection::OnCanRead, conn, _1, _2), TTcp2Options::ServerInputDeadline);
  1063. }
  1064. ~TConnection() override {
  1065. DBGOUT("~TServer::TConnection(" << (!AS_ ? -666 : AS_->Native()) << ")");
  1066. }
  1067. private:
  1068. void OnCanRead(const TErrorCode& ec, IHandlingContext& ctx) {
  1069. if (ec) {
  1070. OnError();
  1071. } else {
  1072. TErrorCode ec2;
  1073. OnReadSome(ec2, AS_->ReadSome(Buff_.Get(), BuffSize_, ec2), ctx);
  1074. }
  1075. }
  1076. void OnError() {
  1077. DBGOUT("Srv OnError(" << (!AS_ ? -666 : AS_->Native()) << ")"
  1078. << " c=" << (size_t)this);
  1079. Canceled_ = true;
  1080. AS_->AsyncCancel();
  1081. }
  1082. void OnReadSome(const TErrorCode& ec, size_t amount, IHandlingContext& ctx) {
  1083. while (1) {
  1084. if (ec || !amount) {
  1085. OnError();
  1086. return;
  1087. }
  1088. try {
  1089. const char* buff = Buff_.Get();
  1090. size_t leftBytes = amount;
  1091. do {
  1092. size_t useBytes = Msg_.LoadFrom(buff, leftBytes);
  1093. leftBytes -= useBytes;
  1094. buff += useBytes;
  1095. if (Msg_.IsComplete()) {
  1096. OnReceiveMessage();
  1097. }
  1098. } while (leftBytes);
  1099. if (amount == BuffSize_) {
  1100. //try decrease system calls, - re-run ReadSome if has full filled buffer
  1101. TErrorCode ecR;
  1102. amount = AS_->ReadSome(Buff_.Get(), BuffSize_, ecR);
  1103. if (!ecR) {
  1104. continue;
  1105. }
  1106. if (ecR.Value() == EAGAIN || ecR.Value() == EWOULDBLOCK) {
  1107. ctx.ContinueUseHandler();
  1108. } else {
  1109. OnError();
  1110. }
  1111. } else {
  1112. ctx.ContinueUseHandler();
  1113. }
  1114. } catch (...) {
  1115. DBGOUT("exc. " << CurrentExceptionMessage());
  1116. OnError();
  1117. }
  1118. return;
  1119. }
  1120. }
  1121. void OnReceiveMessage() {
  1122. DBGOUT("OnReceiveMessage()");
  1123. const TBaseHeader& hdr = Msg_.BaseHeader();
  1124. if (hdr.Type == TBaseHeader::Request) {
  1125. TRequest* reqPtr = new TRequest(TConnectionRef(this), Msg_.Header(), Msg_.Content());
  1126. IRequestRef req(reqPtr);
  1127. ReqsState_[reqPtr->RequestHeader().Id] = reqPtr->State;
  1128. OnRequest(req);
  1129. } else if (hdr.Type == TBaseHeader::Cancel) {
  1130. OnCancelRequest(hdr.Id);
  1131. } else {
  1132. throw yexception() << "unsupported message type: " << (ui32)hdr.Type;
  1133. }
  1134. Msg_.Clear();
  1135. {
  1136. TRequestId reqId;
  1137. while (FinReqs_.Dequeue(&reqId)) {
  1138. ReqsState_.erase(reqId);
  1139. }
  1140. }
  1141. }
  1142. void OnRequest(IRequestRef& r) {
  1143. DBGOUT("OnRequest()");
  1144. Srv_.OnRequest(r);
  1145. }
  1146. void OnCancelRequest(TRequestId reqId) {
  1147. THashMap<TRequestId, TRequest::TStateRef>::iterator it = ReqsState_.find(reqId);
  1148. if (it == ReqsState_.end()) {
  1149. return;
  1150. }
  1151. it->second->Canceled = true;
  1152. }
  1153. public:
  1154. class TOutputData {
  1155. public:
  1156. TOutputData(TRequestId reqId)
  1157. : ReqId(reqId)
  1158. {
  1159. }
  1160. virtual ~TOutputData() {
  1161. }
  1162. virtual void MoveTo(TOutputBuffers& bufs) = 0;
  1163. TRequestId ReqId;
  1164. };
  1165. class TResponseData: public TOutputData {
  1166. public:
  1167. TResponseData(TRequestId reqId, TData& data)
  1168. : TOutputData(reqId)
  1169. {
  1170. Data.swap(data);
  1171. }
  1172. void MoveTo(TOutputBuffers& bufs) override {
  1173. bufs.AddResponse(ReqId, Data);
  1174. }
  1175. TData Data;
  1176. };
  1177. class TResponseErrorData: public TOutputData {
  1178. public:
  1179. TResponseErrorData(TRequestId reqId, TResponseHeader::TErrorCode errorCode)
  1180. : TOutputData(reqId)
  1181. , ErrorCode(errorCode)
  1182. {
  1183. }
  1184. void MoveTo(TOutputBuffers& bufs) override {
  1185. bufs.AddError(ReqId, ErrorCode);
  1186. }
  1187. TResponseHeader::TErrorCode ErrorCode;
  1188. };
  1189. //called non thread-safe (from client thread)
  1190. void SendResponse(TRequestId reqId, TData& data) {
  1191. DBGOUT("SendResponse: " << reqId << " " << (size_t)~data << " c=" << (size_t)this);
  1192. TAutoPtr<TOutputData> od(new TResponseData(reqId, data));
  1193. OutputData_.Enqueue(od);
  1194. ProcessOutputQueue();
  1195. }
  1196. //called non thread-safe (from outside thread)
  1197. void SendError(TRequestId reqId, TResponseHeader::TErrorCode err) {
  1198. DBGOUT("SendResponseError: " << reqId << " c=" << (size_t)this);
  1199. TAutoPtr<TOutputData> od(new TResponseErrorData(reqId, err));
  1200. OutputData_.Enqueue(od);
  1201. ProcessOutputQueue();
  1202. }
  1203. void ProcessOutputQueue() {
  1204. AtomicSet(NeedCheckOutputQueue_, 1);
  1205. if (OutputLock_.TryAquire()) {
  1206. SendMessages(false);
  1207. return;
  1208. }
  1209. DBGOUT("ProcessOutputQueue: !AquireOutputOwnership: " << (int)OutputLock_.IsFree());
  1210. }
  1211. //must be called only after success aquiring output
  1212. void SendMessages(bool asioThread) {
  1213. DBGOUT("TServer::SendMessages(enter)");
  1214. try {
  1215. do {
  1216. AtomicUnlock(&NeedCheckOutputQueue_);
  1217. TAutoPtr<TOutputData> d;
  1218. while (OutputData_.Dequeue(&d)) {
  1219. d->MoveTo(OutputBuffers_);
  1220. if (!OutputBuffers_.HasFreeSpace()) {
  1221. if (!FlushOutputBuffers(asioThread)) {
  1222. return;
  1223. }
  1224. }
  1225. }
  1226. if (OutputBuffers_.HasData()) {
  1227. if (!FlushOutputBuffers(asioThread)) {
  1228. return;
  1229. }
  1230. }
  1231. OutputLock_.Release();
  1232. if (!AtomicGet(NeedCheckOutputQueue_)) {
  1233. DBGOUT("Server::SendMessages(exit2): " << (int)OutputLock_.IsFree());
  1234. return;
  1235. }
  1236. } while (OutputLock_.TryAquire());
  1237. DBGOUT("Server::SendMessages(exit1)");
  1238. } catch (...) {
  1239. OnError();
  1240. }
  1241. }
  1242. bool FlushOutputBuffers(bool asioThread) {
  1243. DBGOUT("FlushOutputBuffers: cnt=" << OutputBuffers_.GetIOvec()->Count() << " c=" << (size_t)this);
  1244. //TODO:reseach direct write efficiency
  1245. if (asioThread || TTcp2Options::ServerUseDirectWrite) {
  1246. TContIOVector& vec = *OutputBuffers_.GetIOvec();
  1247. vec.Proceed(AS_->WriteSome(vec));
  1248. if (vec.Complete()) {
  1249. OutputBuffers_.Clear();
  1250. //DBGOUT("WriteResponse: " << " c=" << (size_t)this);
  1251. return true;
  1252. }
  1253. }
  1254. //socket buffer filled - use async write for sending left data
  1255. DBGOUT("AsyncWriteResponse: "
  1256. << " [" << OutputBuffers_.GetIOvec()->Bytes() << "]"
  1257. << " c=" << (size_t)this);
  1258. AS_->AsyncWrite(OutputBuffers_.GetIOvec(), std::bind(&TConnection::OnSend, TConnectionRef(this), _1, _2, _3), TTcp2Options::ServerOutputDeadline);
  1259. return false;
  1260. }
  1261. void OnFinishRequest(TRequestId reqId) {
  1262. if (Y_LIKELY(!Canceled_)) {
  1263. FinReqs_.Enqueue(reqId);
  1264. }
  1265. }
  1266. private:
  1267. void OnSend(const TErrorCode& ec, size_t amount, IHandlingContext&) {
  1268. Y_UNUSED(amount);
  1269. DBGOUT("TServer::OnSend(" << ec.Value() << ", " << amount << ")");
  1270. if (ec) {
  1271. OnError();
  1272. } else {
  1273. OutputBuffers_.Clear();
  1274. SendMessages(true);
  1275. }
  1276. }
  1277. public:
  1278. bool IsCanceled() const noexcept {
  1279. return Canceled_;
  1280. }
  1281. const TString& RemoteHost() const noexcept {
  1282. return RemoteHost_;
  1283. }
  1284. private:
  1285. TServer& Srv_;
  1286. TTcpSocketRef AS_;
  1287. NAtomic::TBool Canceled_;
  1288. TString RemoteHost_;
  1289. //input
  1290. size_t BuffSize_;
  1291. TArrayHolder<char> Buff_;
  1292. TTcp2Message Msg_;
  1293. THashMap<TRequestId, TRequest::TStateRef> ReqsState_;
  1294. TLockFreeQueue<TRequestId> FinReqs_;
  1295. //output
  1296. TOutputLock OutputLock_; //protect socket/buffers from simultaneous access from few threads
  1297. TAtomic NeedCheckOutputQueue_;
  1298. NNeh::TAutoLockFreeQueue<TOutputData> OutputData_;
  1299. TOutputBuffers OutputBuffers_;
  1300. };
  1301. //////////// TServer /////////
  1302. public:
  1303. TServer(IOnRequest* cb, ui16 port)
  1304. : EP_(TTcp2Options::AsioServerThreads)
  1305. , CB_(cb)
  1306. {
  1307. TNetworkAddress addr(port);
  1308. for (TNetworkAddress::TIterator it = addr.Begin(); it != addr.End(); ++it) {
  1309. TEndpoint ep(new NAddr::TAddrInfo(&*it));
  1310. TTcpAcceptorPtr a(new TTcpAcceptor(EA_.GetIOService()));
  1311. //DBGOUT("bind:" << ep.IpToString() << ":" << ep.Port());
  1312. a->Bind(ep);
  1313. a->Listen(TTcp2Options::Backlog);
  1314. StartAccept(a.Get());
  1315. A_.push_back(a);
  1316. }
  1317. }
  1318. ~TServer() override {
  1319. EA_.SyncShutdown(); //cancel accepting connections
  1320. A_.clear(); //stop listening
  1321. EP_.SyncShutdown(); //close all exist connections
  1322. }
  1323. void StartAccept(TTcpAcceptor* a) {
  1324. const auto s = MakeAtomicShared<TTcpSocket>(EP_.Size() ? EP_.GetExecutor().GetIOService() : EA_.GetIOService());
  1325. a->AsyncAccept(*s, std::bind(&TServer::OnAccept, this, a, s, _1, _2));
  1326. }
  1327. void OnAccept(TTcpAcceptor* a, TTcpSocketRef s, const TErrorCode& ec, IHandlingContext&) {
  1328. if (Y_UNLIKELY(ec)) {
  1329. if (ec.Value() == ECANCELED) {
  1330. return;
  1331. } else if (ec.Value() == EMFILE || ec.Value() == ENFILE || ec.Value() == ENOMEM || ec.Value() == ENOBUFS) {
  1332. //reach some os limit, suspend accepting for preventing busyloop (100% cpu usage)
  1333. TSimpleSharedPtr<TDeadlineTimer> dt(new TDeadlineTimer(a->GetIOService()));
  1334. dt->AsyncWaitExpireAt(TDuration::Seconds(30), std::bind(&TServer::OnTimeoutSuspendAccept, this, a, dt, _1, _2));
  1335. } else {
  1336. Cdbg << "acc: " << ec.Text() << Endl;
  1337. }
  1338. } else {
  1339. SetNonBlock(s->Native());
  1340. PrepareSocket(s->Native());
  1341. TConnection::Create(*this, s);
  1342. }
  1343. StartAccept(a); //continue accepting
  1344. }
  1345. void OnTimeoutSuspendAccept(TTcpAcceptor* a, TSimpleSharedPtr<TDeadlineTimer>, const TErrorCode& ec, IHandlingContext&) {
  1346. if (!ec) {
  1347. DBGOUT("resume acceptor");
  1348. StartAccept(a);
  1349. }
  1350. }
  1351. void OnRequest(IRequestRef& r) {
  1352. try {
  1353. CB_->OnRequest(r);
  1354. } catch (...) {
  1355. Cdbg << CurrentExceptionMessage() << Endl;
  1356. }
  1357. }
  1358. private:
  1359. TVector<TTcpAcceptorPtr> A_;
  1360. TIOServiceExecutor EA_; //thread, where accepted incoming tcp connections
  1361. TExecutorsPool EP_; //threads, for process write/read data to/from tcp connections (if empty, use EA_ for r/w)
  1362. IOnRequest* CB_;
  1363. };
  1364. TServer::TRequest::TRequest(const TConnectionRef& conn, TBuffer& buf, const TString& content)
  1365. : Conn(conn)
  1366. , Content_(content)
  1367. , Replied_(0)
  1368. , State(new TState())
  1369. {
  1370. DBGOUT("TServer::TRequest()");
  1371. Buf.Swap(buf);
  1372. }
  1373. TServer::TRequest::~TRequest() {
  1374. DBGOUT("TServer::~TRequest()");
  1375. if (!AtomicGet(Replied_)) {
  1376. Conn->SendError(RequestHeader().Id, TResponseHeader::EmptyReply);
  1377. }
  1378. Conn->OnFinishRequest(RequestHeader().Id);
  1379. }
  1380. TString TServer::TRequest::RemoteHost() const {
  1381. return Conn->RemoteHost();
  1382. }
  1383. void TServer::TRequest::SendReply(TData& data) {
  1384. do {
  1385. if (AtomicCas(&Replied_, 1, 0)) {
  1386. Conn->SendResponse(RequestHeader().Id, data);
  1387. return;
  1388. }
  1389. } while (AtomicGet(Replied_) == 0);
  1390. }
  1391. class TProtocol: public IProtocol {
  1392. public:
  1393. inline TProtocol() {
  1394. InitNetworkSubSystem();
  1395. }
  1396. IRequesterRef CreateRequester(IOnRequest* cb, const TParsedLocation& loc) override {
  1397. return new TServer(cb, loc.GetPort());
  1398. }
  1399. THandleRef ScheduleRequest(const TMessage& msg, IOnRecv* fallback, TServiceStatRef& ss) override {
  1400. return Singleton<TClient>()->Schedule(msg, fallback, ss);
  1401. }
  1402. TStringBuf Scheme() const noexcept override {
  1403. return TStringBuf("tcp2");
  1404. }
  1405. bool SetOption(TStringBuf name, TStringBuf value) override {
  1406. return TTcp2Options::Set(name, value);
  1407. }
  1408. };
  1409. }
  1410. }
  1411. NNeh::IProtocol* NNeh::Tcp2Protocol() {
  1412. return Singleton<NNehTcp2::TProtocol>();
  1413. }