http2.cpp 73 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498149915001501150215031504150515061507150815091510151115121513151415151516151715181519152015211522152315241525152615271528152915301531153215331534153515361537153815391540154115421543154415451546154715481549155015511552155315541555155615571558155915601561156215631564156515661567156815691570157115721573157415751576157715781579158015811582158315841585158615871588158915901591159215931594159515961597159815991600160116021603160416051606160716081609161016111612161316141615161616171618161916201621162216231624162516261627162816291630163116321633163416351636163716381639164016411642164316441645164616471648164916501651165216531654165516561657165816591660166116621663166416651666166716681669167016711672167316741675167616771678167916801681168216831684168516861687168816891690169116921693169416951696169716981699170017011702170317041705170617071708170917101711171217131714171517161717171817191720172117221723172417251726172717281729173017311732173317341735173617371738173917401741174217431744174517461747174817491750175117521753175417551756175717581759176017611762176317641765176617671768176917701771177217731774177517761777177817791780178117821783178417851786178717881789179017911792179317941795179617971798179918001801180218031804180518061807180818091810181118121813181418151816181718181819182018211822182318241825182618271828182918301831183218331834183518361837183818391840184118421843184418451846184718481849185018511852185318541855185618571858185918601861186218631864186518661867186818691870187118721873187418751876187718781879188018811882188318841885188618871888188918901891189218931894189518961897189818991900190119021903190419051906190719081909191019111912191319141915191619171918191919201921192219231924192519261927192819291930193119321933193419351936193719381939194019411942194319441945194619471948194919501951195219531954195519561957195819591960196119621963196419651966196719681969197019711972197319741975197619771978197919801981198219831984198519861987198819891990199119921993199419951996199719981999200020012002200320042005200620072008200920102011201220132014201520162017201820192020202120222023202420252026202720282029203020312032203320342035203620372038203920402041204220432044204520462047204820492050205120522053205420552056205720582059206020612062206320642065206620672068206920702071207220732074207520762077207820792080208120822083208420852086208720882089
  1. #include "http2.h"
  2. #include "conn_cache.h"
  3. #include "details.h"
  4. #include "factory.h"
  5. #include "http_common.h"
  6. #include "smart_ptr.h"
  7. #include "utils.h"
  8. #include <library/cpp/http/push_parser/http_parser.h>
  9. #include <library/cpp/http/misc/httpcodes.h>
  10. #include <library/cpp/http/misc/parsed_request.h>
  11. #include <library/cpp/neh/asio/executor.h>
  12. #include <util/generic/singleton.h>
  13. #include <util/generic/vector.h>
  14. #include <util/network/iovec.h>
  15. #include <util/stream/output.h>
  16. #include <util/stream/zlib.h>
  17. #include <util/system/condvar.h>
  18. #include <util/system/mutex.h>
  19. #include <util/system/spinlock.h>
  20. #include <util/system/yassert.h>
  21. #include <util/thread/factory.h>
  22. #include <util/thread/singleton.h>
  23. #include <util/system/sanitizers.h>
  24. #include <util/system/thread.h>
  25. #include <atomic>
  26. #if defined(_unix_)
  27. #include <sys/ioctl.h>
  28. #endif
  29. #if defined(_linux_)
  30. #undef SIOCGSTAMP
  31. #undef SIOCGSTAMPNS
  32. #include <linux/sockios.h>
  33. #define FIONWRITE SIOCOUTQ
  34. #endif
  35. //#define DEBUG_HTTP2
  36. #ifdef DEBUG_HTTP2
  37. #define DBGOUT(args) Cout << args << Endl;
  38. #else
  39. #define DBGOUT(args)
  40. #endif
  41. using namespace NDns;
  42. using namespace NAsio;
  43. using namespace NNeh;
  44. using namespace NNeh::NHttp;
  45. using namespace NNeh::NHttp2;
  46. using namespace std::placeholders;
  47. //
  48. // has complex keep-alive references between entities in multi-thread enviroment,
  49. // this create risks for races/memory leak, etc..
  50. // so connecting/disconnecting entities must be doing carefully
  51. //
  52. // handler <=-> request <==> connection(socket) <= handlers, stored in io_service
  53. // ^
  54. // +== connections_cache
  55. // '=>' -- shared/intrusive ptr
  56. // '->' -- weak_ptr
  57. //
  58. static TDuration FixTimeoutForSanitizer(const TDuration timeout) {
  59. ui64 multiplier = 1;
  60. if (NSan::ASanIsOn()) {
  61. // https://github.com/google/sanitizers/wiki/AddressSanitizer
  62. multiplier = 4;
  63. } else if (NSan::MSanIsOn()) {
  64. // via https://github.com/google/sanitizers/wiki/MemorySanitizer
  65. multiplier = 3;
  66. } else if (NSan::TSanIsOn()) {
  67. // via https://clang.llvm.org/docs/ThreadSanitizer.html
  68. multiplier = 15;
  69. }
  70. return TDuration::FromValue(timeout.GetValue() * multiplier);
  71. }
  72. TDuration THttp2Options::ConnectTimeout = FixTimeoutForSanitizer(TDuration::MilliSeconds(1000));
  73. TDuration THttp2Options::InputDeadline = TDuration::Max();
  74. TDuration THttp2Options::OutputDeadline = TDuration::Max();
  75. TDuration THttp2Options::SymptomSlowConnect = FixTimeoutForSanitizer(TDuration::MilliSeconds(10));
  76. size_t THttp2Options::InputBufferSize = 16 * 1024;
  77. bool THttp2Options::KeepInputBufferForCachedConnections = false;
  78. size_t THttp2Options::AsioThreads = 4;
  79. size_t THttp2Options::AsioServerThreads = 4;
  80. bool THttp2Options::EnsureSendingCompleteByAck = false;
  81. int THttp2Options::Backlog = 100;
  82. TDuration THttp2Options::ServerInputDeadline = FixTimeoutForSanitizer(TDuration::MilliSeconds(500));
  83. TDuration THttp2Options::ServerOutputDeadline = TDuration::Max();
  84. TDuration THttp2Options::ServerInputDeadlineKeepAliveMax = FixTimeoutForSanitizer(TDuration::Seconds(120));
  85. TDuration THttp2Options::ServerInputDeadlineKeepAliveMin = FixTimeoutForSanitizer(TDuration::Seconds(10));
  86. bool THttp2Options::ServerUseDirectWrite = false;
  87. bool THttp2Options::UseResponseAsErrorMessage = false;
  88. bool THttp2Options::FullHeadersAsErrorMessage = false;
  89. bool THttp2Options::ErrorDetailsAsResponseBody = false;
  90. bool THttp2Options::RedirectionNotError = false;
  91. bool THttp2Options::AnyResponseIsNotError = false;
  92. bool THttp2Options::TcpKeepAlive = false;
  93. i32 THttp2Options::LimitRequestsPerConnection = -1;
  94. bool THttp2Options::QuickAck = false;
  95. bool THttp2Options::UseAsyncSendRequest = false;
  96. bool THttp2Options::RespectHostInHttpServerNetworkAddress = false;
  97. bool THttp2Options::Set(TStringBuf name, TStringBuf value) {
  98. #define HTTP2_TRY_SET(optType, optName) \
  99. if (name == TStringBuf(#optName)) { \
  100. optName = FromString<optType>(value); \
  101. }
  102. HTTP2_TRY_SET(TDuration, ConnectTimeout)
  103. else HTTP2_TRY_SET(TDuration, InputDeadline)
  104. else HTTP2_TRY_SET(TDuration, OutputDeadline)
  105. else HTTP2_TRY_SET(TDuration, SymptomSlowConnect) else HTTP2_TRY_SET(size_t, InputBufferSize) else HTTP2_TRY_SET(bool, KeepInputBufferForCachedConnections) else HTTP2_TRY_SET(size_t, AsioThreads) else HTTP2_TRY_SET(size_t, AsioServerThreads) else HTTP2_TRY_SET(bool, EnsureSendingCompleteByAck) else HTTP2_TRY_SET(int, Backlog) else HTTP2_TRY_SET(TDuration, ServerInputDeadline) else HTTP2_TRY_SET(TDuration, ServerOutputDeadline) else HTTP2_TRY_SET(TDuration, ServerInputDeadlineKeepAliveMax) else HTTP2_TRY_SET(TDuration, ServerInputDeadlineKeepAliveMin) else HTTP2_TRY_SET(bool, ServerUseDirectWrite) else HTTP2_TRY_SET(bool, UseResponseAsErrorMessage) else HTTP2_TRY_SET(bool, FullHeadersAsErrorMessage) else HTTP2_TRY_SET(bool, ErrorDetailsAsResponseBody) else HTTP2_TRY_SET(bool, RedirectionNotError) else HTTP2_TRY_SET(bool, AnyResponseIsNotError) else HTTP2_TRY_SET(bool, TcpKeepAlive) else HTTP2_TRY_SET(i32, LimitRequestsPerConnection) else HTTP2_TRY_SET(bool, QuickAck)
  106. else HTTP2_TRY_SET(bool, UseAsyncSendRequest) else {
  107. return false;
  108. }
  109. return true;
  110. }
  111. namespace NNeh {
  112. const NDns::TResolvedHost* Resolve(const TStringBuf host, ui16 port, NHttp::EResolverType resolverType);
  113. }
  114. namespace {
  115. //#define DEBUG_STAT
  116. #ifdef DEBUG_STAT
  117. struct TDebugStat {
  118. static std::atomic<size_t> ConnTotal;
  119. static std::atomic<size_t> ConnActive;
  120. static std::atomic<size_t> ConnCached;
  121. static std::atomic<size_t> ConnDestroyed;
  122. static std::atomic<size_t> ConnFailed;
  123. static std::atomic<size_t> ConnConnCanceled;
  124. static std::atomic<size_t> ConnSlow;
  125. static std::atomic<size_t> Conn2Success;
  126. static std::atomic<size_t> ConnPurgedInCache;
  127. static std::atomic<size_t> ConnDestroyedInCache;
  128. static std::atomic<size_t> RequestTotal;
  129. static std::atomic<size_t> RequestSuccessed;
  130. static std::atomic<size_t> RequestFailed;
  131. static void Print() {
  132. Cout << "ct=" << ConnTotal.load(std::memory_order_acquire)
  133. << " ca=" << ConnActive.load(std::memory_order_acquire)
  134. << " cch=" << ConnCached.load(std::memory_order_acquire)
  135. << " cd=" << ConnDestroyed.load(std::memory_order_acquire)
  136. << " cf=" << ConnFailed.load(std::memory_order_acquire)
  137. << " ccc=" << ConnConnCanceled.load(std::memory_order_acquire)
  138. << " csl=" << ConnSlow.load(std::memory_order_acquire)
  139. << " c2s=" << Conn2Success.load(std::memory_order_acquire)
  140. << " cpc=" << ConnPurgedInCache.load(std::memory_order_acquire)
  141. << " cdc=" << ConnDestroyedInCache.load(std::memory_order_acquire)
  142. << " rt=" << RequestTotal.load(std::memory_order_acquire)
  143. << " rs=" << RequestSuccessed.load(std::memory_order_acquire)
  144. << " rf=" << RequestFailed.load(std::memory_order_acquire)
  145. << Endl;
  146. }
  147. };
  148. std::atomic<size_t> TDebugStat::ConnTotal = 0;
  149. std::atomic<size_t> TDebugStat::ConnActive = 0;
  150. std::atomic<size_t> TDebugStat::ConnCached = 0;
  151. std::atomic<size_t> TDebugStat::ConnDestroyed = 0;
  152. std::atomic<size_t> TDebugStat::ConnFailed = 0;
  153. std::atomic<size_t> TDebugStat::ConnConnCanceled = 0;
  154. std::atomic<size_t> TDebugStat::ConnSlow = 0;
  155. std::atomic<size_t> TDebugStat::Conn2Success = 0;
  156. std::atomic<size_t> TDebugStat::ConnPurgedInCache = 0;
  157. std::atomic<size_t> TDebugStat::ConnDestroyedInCache = 0;
  158. std::atomic<size_t> TDebugStat::RequestTotal = 0;
  159. std::atomic<size_t> TDebugStat::RequestSuccessed = 0;
  160. std::atomic<size_t> TDebugStat::RequestFailed = 0;
  161. #endif
  162. inline void PrepareSocket(SOCKET s, const TRequestSettings& requestSettings = TRequestSettings()) {
  163. if (requestSettings.NoDelay) {
  164. SetNoDelay(s, true);
  165. }
  166. }
  167. bool Compress(TData& data, const TString& compressionScheme) {
  168. if (compressionScheme == "gzip" && data.size() > 23) { // there is no string less than 24 bytes long that might be compressed with gzip
  169. try {
  170. TData gzipped(data.size());
  171. TMemoryOutput out(gzipped.data(), gzipped.size());
  172. TZLibCompress c(&out, ZLib::GZip);
  173. c.Write(data.data(), data.size());
  174. c.Finish();
  175. gzipped.resize(out.Buf() - gzipped.data());
  176. data.swap(gzipped);
  177. return true;
  178. } catch (yexception&) {
  179. // gzipped data occupies more space than original data
  180. }
  181. }
  182. return false;
  183. }
  184. class THttpRequestBuffers: public NAsio::TTcpSocket::IBuffers {
  185. public:
  186. THttpRequestBuffers(TRequestData::TPtr rd)
  187. : Req_(rd)
  188. , Parts_(Req_->Parts())
  189. , IOvec_(Parts_.data(), Parts_.size())
  190. {
  191. }
  192. TContIOVector* GetIOvec() override {
  193. return &IOvec_;
  194. }
  195. private:
  196. TRequestData::TPtr Req_;
  197. TVector<IOutputStream::TPart> Parts_;
  198. TContIOVector IOvec_;
  199. };
  200. struct TRequestGet1: public TRequestGet {
  201. static inline TStringBuf Name() noexcept {
  202. return TStringBuf("http");
  203. }
  204. };
  205. struct TRequestPost1: public TRequestPost {
  206. static inline TStringBuf Name() noexcept {
  207. return TStringBuf("post");
  208. }
  209. };
  210. struct TRequestFull1: public TRequestFull {
  211. static inline TStringBuf Name() noexcept {
  212. return TStringBuf("full");
  213. }
  214. };
  215. struct TRequestGet2: public TRequestGet {
  216. static inline TStringBuf Name() noexcept {
  217. return TStringBuf("http2");
  218. }
  219. };
  220. struct TRequestPost2: public TRequestPost {
  221. static inline TStringBuf Name() noexcept {
  222. return TStringBuf("post2");
  223. }
  224. };
  225. struct TRequestFull2: public TRequestFull {
  226. static inline TStringBuf Name() noexcept {
  227. return TStringBuf("full2");
  228. }
  229. };
  230. struct TRequestUnixSocketGet: public TRequestGet {
  231. static inline TStringBuf Name() noexcept {
  232. return TStringBuf("http+unix");
  233. }
  234. static TRequestSettings RequestSettings() {
  235. return TRequestSettings()
  236. .SetNoDelay(false)
  237. .SetResolverType(EResolverType::EUNIXSOCKET);
  238. }
  239. };
  240. struct TRequestUnixSocketPost: public TRequestPost {
  241. static inline TStringBuf Name() noexcept {
  242. return TStringBuf("post+unix");
  243. }
  244. static TRequestSettings RequestSettings() {
  245. return TRequestSettings()
  246. .SetNoDelay(false)
  247. .SetResolverType(EResolverType::EUNIXSOCKET);
  248. }
  249. };
  250. struct TRequestUnixSocketFull: public TRequestFull {
  251. static inline TStringBuf Name() noexcept {
  252. return TStringBuf("full+unix");
  253. }
  254. static TRequestSettings RequestSettings() {
  255. return TRequestSettings()
  256. .SetNoDelay(false)
  257. .SetResolverType(EResolverType::EUNIXSOCKET);
  258. }
  259. };
  260. typedef TAutoPtr<THttpRequestBuffers> THttpRequestBuffersPtr;
  261. class THttpRequest;
  262. typedef TSharedPtrB<THttpRequest> THttpRequestRef;
  263. class THttpConn;
  264. typedef TIntrusivePtr<THttpConn> THttpConnRef;
  265. typedef std::function<TRequestData::TPtr(const TMessage&, const TParsedLocation&)> TRequestBuilder;
  266. class THttpRequest {
  267. public:
  268. class THandle: public TSimpleHandle {
  269. public:
  270. THandle(IOnRecv* f, const TMessage& msg, TStatCollector* s) noexcept
  271. : TSimpleHandle(f, msg, s)
  272. {
  273. }
  274. bool MessageSendedCompletely() const noexcept override {
  275. if (TSimpleHandle::MessageSendedCompletely()) {
  276. return true;
  277. }
  278. THttpRequestRef req(GetRequest());
  279. if (!!req && req->RequestSendedCompletely()) {
  280. const_cast<THandle*>(this)->SetSendComplete();
  281. }
  282. return TSimpleHandle::MessageSendedCompletely();
  283. }
  284. void Cancel() noexcept override {
  285. if (TSimpleHandle::Canceled()) {
  286. return;
  287. }
  288. THttpRequestRef req(GetRequest());
  289. if (!!req) {
  290. TSimpleHandle::Cancel();
  291. req->Cancel();
  292. }
  293. }
  294. void NotifyError(TErrorRef error, const THttpParser* rsp = nullptr) {
  295. #ifdef DEBUG_STAT
  296. ++TDebugStat::RequestFailed;
  297. #endif
  298. if (rsp) {
  299. TSimpleHandle::NotifyError(error, rsp->DecodedContent(), rsp->FirstLine(), rsp->Headers());
  300. } else {
  301. TSimpleHandle::NotifyError(error);
  302. }
  303. ReleaseRequest();
  304. }
  305. //not thread safe!
  306. void SetRequest(const TWeakPtrB<THttpRequest>& r) noexcept {
  307. Req_ = r;
  308. }
  309. private:
  310. THttpRequestRef GetRequest() const noexcept {
  311. TGuard<TSpinLock> g(SP_);
  312. return Req_;
  313. }
  314. void ReleaseRequest() noexcept {
  315. TWeakPtrB<THttpRequest> tmp;
  316. TGuard<TSpinLock> g(SP_);
  317. tmp.Swap(Req_);
  318. }
  319. mutable TSpinLock SP_;
  320. TWeakPtrB<THttpRequest> Req_;
  321. };
  322. typedef TIntrusivePtr<THandle> THandleRef;
  323. static void Run(THandleRef& h, const TMessage& msg, TRequestBuilder f, const TRequestSettings& s) {
  324. THttpRequestRef req(new THttpRequest(h, msg, f, s));
  325. req->WeakThis_ = req;
  326. h->SetRequest(req->WeakThis_);
  327. req->Run(req);
  328. }
  329. ~THttpRequest() {
  330. DBGOUT("~THttpRequest()");
  331. }
  332. private:
  333. THttpRequest(THandleRef& h, TMessage msg, TRequestBuilder f, const TRequestSettings& s)
  334. : Hndl_(h)
  335. , RequestBuilder_(f)
  336. , RequestSettings_(s)
  337. , Msg_(std::move(msg))
  338. , Loc_(Msg_.Addr)
  339. , Addr_(Resolve(Loc_.Host, Loc_.GetPort(), RequestSettings_.ResolverType))
  340. , AddrIter_(Addr_->Addr.Begin())
  341. , Canceled_(false)
  342. , RequestSendedCompletely_(false)
  343. {
  344. }
  345. void Run(THttpRequestRef& req);
  346. public:
  347. THttpRequestBuffersPtr BuildRequest() {
  348. return new THttpRequestBuffers(RequestBuilder_(Msg_, Loc_));
  349. }
  350. TRequestSettings RequestSettings() {
  351. return RequestSettings_;
  352. }
  353. //can create a spare socket in an attempt to decrease connecting time
  354. void OnDetectSlowConnecting();
  355. //remove extra connection on success connec
  356. void OnConnect(THttpConn* c);
  357. //have some response input
  358. void OnBeginRead() noexcept {
  359. RequestSendedCompletely_ = true;
  360. }
  361. void OnResponse(TAutoPtr<THttpParser>& rsp);
  362. void OnConnectFailed(THttpConn* c, const TErrorCode& ec);
  363. void OnSystemError(THttpConn* c, const TErrorCode& ec);
  364. void OnError(THttpConn* c, const TString& errorText);
  365. bool RequestSendedCompletely() noexcept;
  366. void Cancel() noexcept;
  367. private:
  368. void NotifyResponse(const TString& resp, const TString& firstLine, const THttpHeaders& headers) {
  369. THandleRef h(ReleaseHandler());
  370. if (!!h) {
  371. h->NotifyResponse(resp, firstLine, headers);
  372. }
  373. }
  374. void NotifyError(
  375. const TString& errorText,
  376. TError::TType errorType = TError::UnknownType,
  377. i32 errorCode = 0, i32 systemErrorCode = 0) {
  378. NotifyError(new TError(errorText, errorType, errorCode, systemErrorCode));
  379. }
  380. void NotifyError(TErrorRef error, const THttpParser* rsp = nullptr) {
  381. THandleRef h(ReleaseHandler());
  382. if (!!h) {
  383. h->NotifyError(error, rsp);
  384. }
  385. }
  386. void Finalize(THttpConn* skipConn = nullptr) noexcept;
  387. inline THandleRef ReleaseHandler() noexcept {
  388. THandleRef h;
  389. {
  390. TGuard<TSpinLock> g(SL_);
  391. h.Swap(Hndl_);
  392. }
  393. return h;
  394. }
  395. inline THttpConnRef GetConn() noexcept {
  396. TGuard<TSpinLock> g(SL_);
  397. return Conn_;
  398. }
  399. inline THttpConnRef ReleaseConn() noexcept {
  400. THttpConnRef c;
  401. {
  402. TGuard<TSpinLock> g(SL_);
  403. c.Swap(Conn_);
  404. }
  405. return c;
  406. }
  407. inline THttpConnRef ReleaseConn2() noexcept {
  408. THttpConnRef c;
  409. {
  410. TGuard<TSpinLock> g(SL_);
  411. c.Swap(Conn2_);
  412. }
  413. return c;
  414. }
  415. TSpinLock SL_; //guaranted calling notify() only once (prevent race between asio thread and current)
  416. THandleRef Hndl_;
  417. TRequestBuilder RequestBuilder_;
  418. TRequestSettings RequestSettings_;
  419. const TMessage Msg_;
  420. const TParsedLocation Loc_;
  421. const TResolvedHost* Addr_;
  422. TNetworkAddress::TIterator AddrIter_;
  423. THttpConnRef Conn_;
  424. THttpConnRef Conn2_; //concurrent connection used, if detected slow connecting on first connection
  425. TWeakPtrB<THttpRequest> WeakThis_;
  426. TAtomicBool Canceled_;
  427. TAtomicBool RequestSendedCompletely_;
  428. };
  429. TAtomicCounter* HttpOutConnCounter();
  430. class THttpConn: public TThrRefBase {
  431. public:
  432. static THttpConnRef Create(TIOService& srv);
  433. ~THttpConn() override {
  434. DBGOUT("~THttpConn()");
  435. Req_.Reset();
  436. HttpOutConnCounter()->Dec();
  437. #ifdef DEBUG_STAT
  438. ++TDebugStat::ConnDestroyed;
  439. #endif
  440. }
  441. void StartRequest(THttpRequestRef req, const TEndpoint& ep, size_t addrId, TDuration slowConn) {
  442. {
  443. //thread safe linking connection->request
  444. TGuard<TSpinLock> g(SL_);
  445. Req_ = req;
  446. }
  447. AddrId_ = addrId;
  448. try {
  449. TDuration connectDeadline(THttp2Options::ConnectTimeout);
  450. if (THttp2Options::ConnectTimeout > slowConn) {
  451. //use append non fatal connect deadline, so on first timedout
  452. //report about slow connecting to THttpRequest, and continue wait ConnectDeadline_ period
  453. connectDeadline = slowConn;
  454. ConnectDeadline_ = THttp2Options::ConnectTimeout - slowConn;
  455. }
  456. DBGOUT("AsyncConnect to " << ep.IpToString());
  457. AS_.AsyncConnect(ep, std::bind(&THttpConn::OnConnect, THttpConnRef(this), _1, _2), connectDeadline);
  458. } catch (...) {
  459. ReleaseRequest();
  460. throw;
  461. }
  462. }
  463. //start next request on keep-alive connection
  464. bool StartNextRequest(THttpRequestRef& req) {
  465. if (Finalized_) {
  466. return false;
  467. }
  468. {
  469. //thread safe linking connection->request
  470. TGuard<TSpinLock> g(SL_);
  471. Req_ = req;
  472. }
  473. RequestWritten_ = false;
  474. BeginReadResponse_ = false;
  475. try {
  476. TErrorCode ec;
  477. SendRequest(req->BuildRequest(), ec); //throw std::bad_alloc
  478. if (ec.Value() == ECANCELED) {
  479. OnCancel();
  480. } else if (ec) {
  481. OnError(ec);
  482. }
  483. } catch (...) {
  484. OnError(CurrentExceptionMessage());
  485. throw;
  486. }
  487. return true;
  488. }
  489. //connection received from cache must be validated before using
  490. //(process removing closed conection from cache consume some time)
  491. inline bool IsValid() const noexcept {
  492. return !Finalized_;
  493. }
  494. void SetCached(bool v) noexcept {
  495. Cached_ = v;
  496. }
  497. void Close() noexcept {
  498. try {
  499. Cancel();
  500. } catch (...) {
  501. }
  502. }
  503. void DetachRequest() noexcept {
  504. ReleaseRequest();
  505. }
  506. void Cancel() { //throw std::bad_alloc
  507. if (!Canceled_) {
  508. Canceled_ = true;
  509. Finalized_ = true;
  510. OnCancel();
  511. AS_.AsyncCancel();
  512. }
  513. }
  514. void OnCancel() {
  515. THttpRequestRef r(ReleaseRequest());
  516. if (!!r) {
  517. static const TString reqCanceled("request canceled");
  518. r->OnError(this, reqCanceled);
  519. }
  520. }
  521. bool RequestSendedCompletely() const noexcept {
  522. DBGOUT("RequestSendedCompletely()");
  523. if (!Connected_ || !RequestWritten_) {
  524. return false;
  525. }
  526. if (BeginReadResponse_) {
  527. return true;
  528. }
  529. #if defined(FIONWRITE)
  530. if (THttp2Options::EnsureSendingCompleteByAck) {
  531. int nbytes = Max<int>();
  532. int err = ioctl(AS_.Native(), FIONWRITE, &nbytes);
  533. return err ? false : nbytes == 0;
  534. }
  535. #endif
  536. return true;
  537. }
  538. TIOService& GetIOService() const noexcept {
  539. return AS_.GetIOService();
  540. }
  541. private:
  542. THttpConn(TIOService& srv)
  543. : AddrId_(0)
  544. , AS_(srv)
  545. , BuffSize_(THttp2Options::InputBufferSize)
  546. , Connected_(false)
  547. , Cached_(false)
  548. , Canceled_(false)
  549. , Finalized_(false)
  550. , InAsyncRead_(false)
  551. , RequestWritten_(false)
  552. , BeginReadResponse_(false)
  553. {
  554. HttpOutConnCounter()->Inc();
  555. }
  556. //can be called only from asio
  557. void OnConnect(const TErrorCode& ec, IHandlingContext& ctx) {
  558. DBGOUT("THttpConn::OnConnect: " << ec.Value());
  559. if (Y_UNLIKELY(ec)) {
  560. if (ec.Value() == ETIMEDOUT && ConnectDeadline_.GetValue()) {
  561. //detect slow connecting (yet not reached final timeout)
  562. DBGOUT("OnConnectTimingCheck");
  563. THttpRequestRef req(GetRequest());
  564. if (!req) {
  565. return; //cancel from client thread can ahead us
  566. }
  567. TDuration newDeadline(ConnectDeadline_);
  568. ConnectDeadline_ = TDuration::Zero(); //next timeout is final
  569. req->OnDetectSlowConnecting();
  570. //continue wait connect
  571. ctx.ContinueUseHandler(newDeadline);
  572. return;
  573. }
  574. #ifdef DEBUG_STAT
  575. if (ec.Value() != ECANCELED) {
  576. ++TDebugStat::ConnFailed;
  577. } else {
  578. ++TDebugStat::ConnConnCanceled;
  579. }
  580. #endif
  581. if (ec.Value() == EIO) {
  582. //try get more detail error info
  583. char buf[1];
  584. TErrorCode errConnect;
  585. AS_.ReadSome(buf, 1, errConnect);
  586. OnConnectFailed(errConnect.Value() ? errConnect : ec);
  587. } else if (ec.Value() == ECANCELED) {
  588. // not try connecting to next host ip addr, simple fail
  589. OnError(ec);
  590. } else {
  591. OnConnectFailed(ec);
  592. }
  593. } else {
  594. Connected_ = true;
  595. THttpRequestRef req(GetRequest());
  596. if (!req || Canceled_) {
  597. return;
  598. }
  599. try {
  600. PrepareSocket(AS_.Native(), req->RequestSettings());
  601. if (THttp2Options::TcpKeepAlive) {
  602. SetKeepAlive(AS_.Native(), true);
  603. }
  604. } catch (TSystemError& err) {
  605. TErrorCode ec2(err.Status());
  606. OnError(ec2);
  607. return;
  608. }
  609. req->OnConnect(this);
  610. THttpRequestBuffersPtr ptr(req->BuildRequest());
  611. PrepareParser();
  612. TErrorCode ec3;
  613. SendRequest(ptr, ec3);
  614. if (ec3) {
  615. OnError(ec3);
  616. }
  617. }
  618. }
  619. void PrepareParser() {
  620. Prs_ = new THttpParser();
  621. Prs_->Prepare();
  622. }
  623. void SendRequest(const THttpRequestBuffersPtr& bfs, TErrorCode& ec) { //throw std::bad_alloc
  624. if (!THttp2Options::UseAsyncSendRequest) {
  625. size_t amount = AS_.WriteSome(*bfs->GetIOvec(), ec);
  626. if (ec && ec.Value() != EAGAIN && ec.Value() != EWOULDBLOCK && ec.Value() != EINPROGRESS) {
  627. return;
  628. }
  629. ec.Assign(0);
  630. bfs->GetIOvec()->Proceed(amount);
  631. if (bfs->GetIOvec()->Complete()) {
  632. RequestWritten_ = true;
  633. StartRead();
  634. } else {
  635. SendRequestAsync(bfs);
  636. }
  637. } else {
  638. SendRequestAsync(bfs);
  639. }
  640. }
  641. void SendRequestAsync(const THttpRequestBuffersPtr& bfs) {
  642. NAsio::TTcpSocket::TSendedData sd(bfs.Release());
  643. AS_.AsyncWrite(sd, std::bind(&THttpConn::OnWrite, THttpConnRef(this), _1, _2, _3), THttp2Options::OutputDeadline);
  644. }
  645. void OnWrite(const TErrorCode& err, size_t amount, IHandlingContext& ctx) {
  646. Y_UNUSED(amount);
  647. Y_UNUSED(ctx);
  648. if (err) {
  649. OnError(err);
  650. } else {
  651. DBGOUT("OnWrite()");
  652. RequestWritten_ = true;
  653. StartRead();
  654. }
  655. }
  656. inline void StartRead() {
  657. if (!InAsyncRead_ && !Canceled_) {
  658. InAsyncRead_ = true;
  659. AS_.AsyncPollRead(std::bind(&THttpConn::OnCanRead, THttpConnRef(this), _1, _2), THttp2Options::InputDeadline);
  660. }
  661. }
  662. //can be called only from asio
  663. void OnReadSome(const TErrorCode& err, size_t bytes, IHandlingContext& ctx) {
  664. if (Y_UNLIKELY(err)) {
  665. OnError(err);
  666. return;
  667. }
  668. if (!BeginReadResponse_) {
  669. //used in MessageSendedCompletely()
  670. BeginReadResponse_ = true;
  671. THttpRequestRef r(GetRequest());
  672. if (!!r) {
  673. r->OnBeginRead();
  674. }
  675. }
  676. DBGOUT("receive:" << TStringBuf(Buff_.Get(), bytes));
  677. try {
  678. if (!Prs_) {
  679. throw yexception() << TStringBuf("receive some data while not in request");
  680. }
  681. #if defined(_linux_)
  682. if (THttp2Options::QuickAck) {
  683. SetSockOpt(AS_.Native(), SOL_TCP, TCP_QUICKACK, (int)1);
  684. }
  685. #endif
  686. DBGOUT("parse:");
  687. while (!Prs_->Parse(Buff_.Get(), bytes)) {
  688. if (BuffSize_ == bytes) {
  689. TErrorCode ec;
  690. bytes = AS_.ReadSome(Buff_.Get(), BuffSize_, ec);
  691. if (!ec) {
  692. continue;
  693. }
  694. if (ec.Value() != EAGAIN && ec.Value() != EWOULDBLOCK) {
  695. OnError(ec);
  696. return;
  697. }
  698. }
  699. //continue async. read from socket
  700. ctx.ContinueUseHandler(THttp2Options::InputDeadline);
  701. return;
  702. }
  703. //succesfully reach end of http response
  704. THttpRequestRef r(ReleaseRequest());
  705. if (!r) {
  706. //lost race to req. canceling
  707. DBGOUT("connection failed");
  708. return;
  709. }
  710. DBGOUT("response:");
  711. bool keepALive = Prs_->IsKeepAlive();
  712. r->OnResponse(Prs_);
  713. if (!keepALive) {
  714. return;
  715. }
  716. //continue use connection (keep-alive mode)
  717. PrepareParser();
  718. if (!THttp2Options::KeepInputBufferForCachedConnections) {
  719. Buff_.Destroy();
  720. }
  721. //continue async. read from socket
  722. ctx.ContinueUseHandler(THttp2Options::InputDeadline);
  723. PutSelfToCache();
  724. } catch (...) {
  725. OnError(CurrentExceptionMessage());
  726. }
  727. }
  728. void PutSelfToCache();
  729. //method for reaction on input data for re-used keep-alive connection,
  730. //which free/release buffer when was placed in cache
  731. void OnCanRead(const TErrorCode& err, IHandlingContext& ctx) {
  732. if (Y_UNLIKELY(err)) {
  733. OnError(err);
  734. } else {
  735. if (!Buff_) {
  736. Buff_.Reset(new char[BuffSize_]);
  737. }
  738. TErrorCode ec;
  739. OnReadSome(ec, AS_.ReadSome(Buff_.Get(), BuffSize_, ec), ctx);
  740. }
  741. }
  742. //unlink connection and request, thread-safe mark connection as non valid
  743. inline THttpRequestRef GetRequest() noexcept {
  744. TGuard<TSpinLock> g(SL_);
  745. return Req_;
  746. }
  747. inline THttpRequestRef ReleaseRequest() noexcept {
  748. THttpRequestRef r;
  749. {
  750. TGuard<TSpinLock> g(SL_);
  751. r.Swap(Req_);
  752. }
  753. return r;
  754. }
  755. void OnConnectFailed(const TErrorCode& ec);
  756. inline void OnError(const TErrorCode& ec) {
  757. OnError(ec.Text());
  758. }
  759. inline void OnError(const TString& errText);
  760. size_t AddrId_;
  761. NAsio::TTcpSocket AS_;
  762. TArrayHolder<char> Buff_; //input buffer
  763. const size_t BuffSize_;
  764. TAutoPtr<THttpParser> Prs_; //input data parser & parsed info storage
  765. TSpinLock SL_;
  766. THttpRequestRef Req_; //current request
  767. TDuration ConnectDeadline_;
  768. TAtomicBool Connected_;
  769. TAtomicBool Cached_;
  770. TAtomicBool Canceled_;
  771. TAtomicBool Finalized_;
  772. bool InAsyncRead_;
  773. TAtomicBool RequestWritten_;
  774. TAtomicBool BeginReadResponse_;
  775. };
  776. //conn limits monitoring, cache clean, contain used in http clients asio threads/executors
  777. class THttpConnManager: public IThreadFactory::IThreadAble {
  778. public:
  779. THttpConnManager()
  780. : TotalConn(0)
  781. , EP_(THttp2Options::AsioThreads)
  782. , InPurging_(0)
  783. , MaxConnId_(0)
  784. , Shutdown_(false)
  785. {
  786. T_ = SystemThreadFactory()->Run(this);
  787. Limits.SetSoft(40000);
  788. Limits.SetHard(50000);
  789. }
  790. ~THttpConnManager() override {
  791. {
  792. TGuard<TMutex> g(PurgeMutex_);
  793. Shutdown_ = true;
  794. CondPurge_.Signal();
  795. }
  796. EP_.SyncShutdown();
  797. T_->Join();
  798. }
  799. inline void SetLimits(size_t softLimit, size_t hardLimit) noexcept {
  800. Limits.SetSoft(softLimit);
  801. Limits.SetHard(hardLimit);
  802. }
  803. inline std::pair<size_t, size_t> GetLimits() const noexcept {
  804. return {Limits.Soft(), Limits.Hard()};
  805. }
  806. inline void CheckLimits() {
  807. if (ExceedSoftLimit()) {
  808. SuggestPurgeCache();
  809. if (ExceedHardLimit()) {
  810. Y_ABORT("neh::http2 output connections limit reached");
  811. //ythrow yexception() << "neh::http2 output connections limit reached";
  812. }
  813. }
  814. }
  815. inline bool Get(THttpConnRef& conn, size_t addrId) {
  816. #ifdef DEBUG_STAT
  817. TDebugStat::ConnTotal.store(TotalConn.Val(), std::memory_order_release);
  818. TDebugStat::ConnActive.store(Active(), std::memory_order_release);
  819. TDebugStat::ConnCached.store(Cache_.Size(), std::memory_order_release);
  820. #endif
  821. return Cache_.Get(conn, addrId);
  822. }
  823. inline void Put(THttpConnRef& conn, size_t addrId) {
  824. if (Y_LIKELY(!Shutdown_ && !ExceedHardLimit() && !CacheDisabled())) {
  825. if (Y_UNLIKELY(addrId > (size_t)AtomicGet(MaxConnId_))) {
  826. AtomicSet(MaxConnId_, addrId);
  827. }
  828. Cache_.Put(conn, addrId);
  829. } else {
  830. conn->Close();
  831. conn.Drop();
  832. }
  833. }
  834. inline size_t OnConnError(size_t addrId) {
  835. return Cache_.Validate(addrId);
  836. }
  837. TIOService& GetIOService() {
  838. return EP_.GetExecutor().GetIOService();
  839. }
  840. bool CacheDisabled() const {
  841. return Limits.Soft() == 0;
  842. }
  843. bool IsShutdown() const noexcept {
  844. return Shutdown_;
  845. }
  846. TAtomicCounter TotalConn;
  847. private:
  848. inline size_t Total() const noexcept {
  849. return TotalConn.Val();
  850. }
  851. inline size_t Active() const noexcept {
  852. return TFdLimits::ExceedLimit(Total(), Cache_.Size());
  853. }
  854. inline size_t ExceedSoftLimit() const noexcept {
  855. return TFdLimits::ExceedLimit(Total(), Limits.Soft());
  856. }
  857. inline size_t ExceedHardLimit() const noexcept {
  858. return TFdLimits::ExceedLimit(Total(), Limits.Hard());
  859. }
  860. void SuggestPurgeCache() {
  861. if (AtomicTryLock(&InPurging_)) {
  862. //evaluate the usefulness of purging the cache
  863. //если в кеше мало соединений (< MaxConnId_/16 или 64), не чистим кеш
  864. if (Cache_.Size() > (Min((size_t)AtomicGet(MaxConnId_), (size_t)1024U) >> 4)) {
  865. //по мере приближения к hardlimit нужда в чистке cache приближается к 100%
  866. size_t closenessToHardLimit256 = ((Active() + 1) << 8) / (Limits.Delta() + 1);
  867. //чем больше соединений в кеше, а не в работе, тем менее нужен кеш (можно его почистить)
  868. size_t cacheUselessness256 = ((Cache_.Size() + 1) << 8) / (Active() + 1);
  869. //итого, - пороги срабатывания:
  870. //при достижении soft-limit, если соединения в кеше, а не в работе
  871. //на полпути от soft-limit к hard-limit, если в кеше больше половины соединений
  872. //при приближении к hardlimit пытаться почистить кеш почти постоянно
  873. if ((closenessToHardLimit256 + cacheUselessness256) >= 256U) {
  874. TGuard<TMutex> g(PurgeMutex_);
  875. CondPurge_.Signal();
  876. return; //memo: thread MUST unlock InPurging_ (see DoExecute())
  877. }
  878. }
  879. AtomicUnlock(&InPurging_);
  880. }
  881. }
  882. void DoExecute() override {
  883. TThread::SetCurrentThreadName("NehHttpConnMngr");
  884. while (true) {
  885. {
  886. TGuard<TMutex> g(PurgeMutex_);
  887. if (Shutdown_)
  888. return;
  889. CondPurge_.WaitI(PurgeMutex_);
  890. }
  891. PurgeCache();
  892. AtomicUnlock(&InPurging_);
  893. }
  894. }
  895. void PurgeCache() noexcept {
  896. //try remove at least ExceedSoftLimit() oldest connections from cache
  897. //вычисляем долю кеша, которую нужно почистить (в 256 долях) (но не менее 1/32 кеша)
  898. size_t frac256 = Min(size_t(Max(size_t(256U / 32U), (ExceedSoftLimit() << 8) / (Cache_.Size() + 1))), (size_t)256U);
  899. size_t processed = 0;
  900. size_t maxConnId = AtomicGet(MaxConnId_);
  901. for (size_t i = 0; i <= maxConnId && !Shutdown_; ++i) {
  902. processed += Cache_.Purge(i, frac256);
  903. if (processed > 32) {
  904. #ifdef DEBUG_STAT
  905. TDebugStat::ConnPurgedInCache += processed;
  906. #endif
  907. processed = 0;
  908. Sleep(TDuration::MilliSeconds(10)); //prevent big spike cpu/system usage
  909. }
  910. }
  911. }
  912. TFdLimits Limits;
  913. TExecutorsPool EP_;
  914. TConnCache<THttpConn> Cache_;
  915. TAtomic InPurging_;
  916. TAtomic MaxConnId_;
  917. TAutoPtr<IThreadFactory::IThread> T_;
  918. TCondVar CondPurge_;
  919. TMutex PurgeMutex_;
  920. TAtomicBool Shutdown_;
  921. };
  922. THttpConnManager* HttpConnManager() {
  923. return Singleton<THttpConnManager>();
  924. }
  925. TAtomicCounter* HttpOutConnCounter() {
  926. return &HttpConnManager()->TotalConn;
  927. }
  928. THttpConnRef THttpConn::Create(TIOService& srv) {
  929. if (HttpConnManager()->IsShutdown()) {
  930. throw yexception() << "can't create connection with shutdowned service";
  931. }
  932. return new THttpConn(srv);
  933. }
  934. void THttpConn::PutSelfToCache() {
  935. THttpConnRef c(this);
  936. HttpConnManager()->Put(c, AddrId_);
  937. }
  938. void THttpConn::OnConnectFailed(const TErrorCode& ec) {
  939. THttpRequestRef r(GetRequest());
  940. if (!!r) {
  941. r->OnConnectFailed(this, ec);
  942. }
  943. OnError(ec);
  944. }
  945. void THttpConn::OnError(const TString& errText) {
  946. Finalized_ = true;
  947. if (Connected_) {
  948. Connected_ = false;
  949. TErrorCode ec;
  950. AS_.Shutdown(NAsio::TTcpSocket::ShutdownBoth, ec);
  951. } else {
  952. if (AS_.IsOpen()) {
  953. AS_.AsyncCancel();
  954. }
  955. }
  956. THttpRequestRef r(ReleaseRequest());
  957. if (!!r) {
  958. r->OnError(this, errText);
  959. } else {
  960. if (Cached_) {
  961. size_t res = HttpConnManager()->OnConnError(AddrId_);
  962. Y_UNUSED(res);
  963. #ifdef DEBUG_STAT
  964. TDebugStat::ConnDestroyedInCache += res;
  965. #endif
  966. }
  967. }
  968. }
  969. void THttpRequest::Run(THttpRequestRef& req) {
  970. #ifdef DEBUG_STAT
  971. if ((++TDebugStat::RequestTotal & 0xFFF) == 0) {
  972. TDebugStat::Print();
  973. }
  974. #endif
  975. try {
  976. while (!Canceled_) {
  977. THttpConnRef conn;
  978. if (HttpConnManager()->Get(conn, Addr_->Id)) {
  979. DBGOUT("Use connection from cache");
  980. Conn_ = conn; //thread magic
  981. if (!conn->StartNextRequest(req)) {
  982. continue; //if use connection from cache, ignore write error and try another conn
  983. }
  984. } else {
  985. HttpConnManager()->CheckLimits(); //here throw exception if reach hard limit (or atexit() state)
  986. Conn_ = THttpConn::Create(HttpConnManager()->GetIOService());
  987. TEndpoint ep(new NAddr::TAddrInfo(&*AddrIter_));
  988. Conn_->StartRequest(req, ep, Addr_->Id, THttp2Options::SymptomSlowConnect); // can throw
  989. }
  990. break;
  991. }
  992. } catch (...) {
  993. Conn_.Reset();
  994. throw;
  995. }
  996. }
  997. //it seems we have lost TCP SYN packet, create extra connection for decrease response time
  998. void THttpRequest::OnDetectSlowConnecting() {
  999. #ifdef DEBUG_STAT
  1000. ++TDebugStat::ConnSlow;
  1001. #endif
  1002. //use some io_service (Run() thread-executor), from first conn. for more thread safety
  1003. THttpConnRef conn = GetConn();
  1004. if (!conn) {
  1005. return;
  1006. }
  1007. THttpConnRef conn2;
  1008. try {
  1009. conn2 = THttpConn::Create(conn->GetIOService());
  1010. } catch (...) {
  1011. return; // cant create spare connection, simple continue use only main
  1012. }
  1013. {
  1014. TGuard<TSpinLock> g(SL_);
  1015. Conn2_ = conn2;
  1016. }
  1017. if (Y_UNLIKELY(Canceled_)) {
  1018. ReleaseConn2();
  1019. } else {
  1020. //use connect timeout for disable detecting slow connecting on second conn.
  1021. TEndpoint ep(new NAddr::TAddrInfo(&*Addr_->Addr.Begin()));
  1022. try {
  1023. conn2->StartRequest(WeakThis_, ep, Addr_->Id, THttp2Options::ConnectTimeout);
  1024. } catch (...) {
  1025. // ignore errors on spare connection
  1026. ReleaseConn2();
  1027. }
  1028. }
  1029. }
  1030. void THttpRequest::OnConnect(THttpConn* c) {
  1031. THttpConnRef extraConn;
  1032. {
  1033. TGuard<TSpinLock> g(SL_);
  1034. if (Y_UNLIKELY(!!Conn2_)) {
  1035. //has pair concurrent conn, 'should stay only one'
  1036. if (Conn2_.Get() == c) {
  1037. #ifdef DEBUG_STAT
  1038. ++TDebugStat::Conn2Success;
  1039. #endif
  1040. Conn2_.Swap(Conn_);
  1041. }
  1042. extraConn.Swap(Conn2_);
  1043. }
  1044. }
  1045. if (!!extraConn) {
  1046. extraConn->DetachRequest(); //prevent call OnError()
  1047. extraConn->Close();
  1048. }
  1049. }
  1050. void THttpRequest::OnResponse(TAutoPtr<THttpParser>& rsp) {
  1051. DBGOUT("THttpRequest::OnResponse()");
  1052. ReleaseConn();
  1053. if (Y_LIKELY(((rsp->RetCode() >= 200 && rsp->RetCode() < (!THttp2Options::RedirectionNotError ? 300 : 400)) || THttp2Options::AnyResponseIsNotError))) {
  1054. NotifyResponse(rsp->DecodedContent(), rsp->FirstLine(), rsp->Headers());
  1055. } else {
  1056. TString message;
  1057. if (THttp2Options::FullHeadersAsErrorMessage) {
  1058. TStringStream err;
  1059. err << rsp->FirstLine();
  1060. THttpHeaders hdrs = rsp->Headers();
  1061. for (auto h = hdrs.begin(); h < hdrs.end(); h++) {
  1062. err << h->ToString() << TStringBuf("\r\n");
  1063. }
  1064. message = err.Str();
  1065. } else if (THttp2Options::UseResponseAsErrorMessage) {
  1066. message = rsp->DecodedContent();
  1067. } else {
  1068. TStringStream err;
  1069. err << TStringBuf("request failed(") << rsp->FirstLine() << TStringBuf(")");
  1070. message = err.Str();
  1071. }
  1072. NotifyError(new TError(message, TError::ProtocolSpecific, rsp->RetCode()), rsp.Get());
  1073. }
  1074. }
  1075. void THttpRequest::OnConnectFailed(THttpConn* c, const TErrorCode& ec) {
  1076. DBGOUT("THttpRequest::OnConnectFailed()");
  1077. //detach/discard failed conn, try connect to next ip addr (if can)
  1078. THttpConnRef cc(GetConn());
  1079. if (c != cc.Get() || AddrIter_ == Addr_->Addr.End() || ++AddrIter_ == Addr_->Addr.End() || Canceled_) {
  1080. return OnSystemError(c, ec);
  1081. }
  1082. // can try next host addr
  1083. c->DetachRequest();
  1084. c->Close();
  1085. THttpConnRef nextConn;
  1086. try {
  1087. nextConn = THttpConn::Create(HttpConnManager()->GetIOService());
  1088. } catch (...) {
  1089. OnSystemError(nullptr, ec);
  1090. return;
  1091. }
  1092. {
  1093. THttpConnRef nc = nextConn;
  1094. TGuard<TSpinLock> g(SL_);
  1095. Conn_.Swap(nc);
  1096. }
  1097. TEndpoint ep(new NAddr::TAddrInfo(&*AddrIter_));
  1098. try {
  1099. nextConn->StartRequest(WeakThis_, ep, Addr_->Id, THttp2Options::SymptomSlowConnect);
  1100. } catch (...) {
  1101. OnError(nullptr, CurrentExceptionMessage());
  1102. return;
  1103. }
  1104. if (Canceled_) {
  1105. OnError(nullptr, "canceled");
  1106. }
  1107. }
  1108. void THttpRequest::OnSystemError(THttpConn* c, const TErrorCode& ec) {
  1109. DBGOUT("THttpRequest::OnSystemError()");
  1110. NotifyError(ec.Text(), TError::TType::UnknownType, 0, ec.Value());
  1111. Finalize(c);
  1112. }
  1113. void THttpRequest::OnError(THttpConn* c, const TString& errorText) {
  1114. DBGOUT("THttpRequest::OnError()");
  1115. NotifyError(errorText);
  1116. Finalize(c);
  1117. }
  1118. bool THttpRequest::RequestSendedCompletely() noexcept {
  1119. if (RequestSendedCompletely_) {
  1120. return true;
  1121. }
  1122. THttpConnRef c(GetConn());
  1123. return !!c ? c->RequestSendedCompletely() : false;
  1124. }
  1125. void THttpRequest::Cancel() noexcept {
  1126. if (!Canceled_) {
  1127. Canceled_ = true;
  1128. try {
  1129. static const TString canceled("Canceled");
  1130. NotifyError(canceled, TError::Cancelled);
  1131. Finalize();
  1132. } catch (...) {
  1133. }
  1134. }
  1135. }
  1136. inline void FinalizeConn(THttpConnRef& c, THttpConn* skipConn) noexcept {
  1137. if (!!c && c.Get() != skipConn) {
  1138. c->DetachRequest();
  1139. c->Close();
  1140. }
  1141. }
  1142. void THttpRequest::Finalize(THttpConn* skipConn) noexcept {
  1143. THttpConnRef c1(ReleaseConn());
  1144. FinalizeConn(c1, skipConn);
  1145. THttpConnRef c2(ReleaseConn2());
  1146. FinalizeConn(c2, skipConn);
  1147. }
  1148. /////////////////////////////////// server side ////////////////////////////////////
  1149. TAtomicCounter* HttpInConnCounter() {
  1150. return Singleton<TAtomicCounter>();
  1151. }
  1152. TFdLimits* HttpInConnLimits() {
  1153. return Singleton<TFdLimits>();
  1154. }
  1155. class THttpServer: public IRequester {
  1156. typedef TAutoPtr<TTcpAcceptor> TTcpAcceptorPtr;
  1157. typedef TAtomicSharedPtr<TTcpSocket> TTcpSocketRef;
  1158. class TConn;
  1159. typedef TSharedPtrB<TConn> TConnRef;
  1160. class TRequest: public IHttpRequest {
  1161. public:
  1162. TRequest(TWeakPtrB<TConn>& c, TAutoPtr<THttpParser>& p)
  1163. : C_(c)
  1164. , P_(p)
  1165. , RemoteHost_(C_->RemoteHost())
  1166. , CompressionScheme_(P_->GetBestCompressionScheme())
  1167. , H_(TStringBuf(P_->FirstLine()))
  1168. {
  1169. }
  1170. ~TRequest() override {
  1171. if (!!C_) {
  1172. try {
  1173. C_->SendError(Id(), 503, "service unavailable (request ignored)", P_->HttpVersion(), {});
  1174. } catch (...) {
  1175. DBGOUT("~TRequest()::SendFail() exception");
  1176. }
  1177. }
  1178. }
  1179. TAtomicBase Id() const {
  1180. return Id_;
  1181. }
  1182. protected:
  1183. TStringBuf Scheme() const override {
  1184. return TStringBuf("http");
  1185. }
  1186. TString RemoteHost() const override {
  1187. return RemoteHost_;
  1188. }
  1189. TStringBuf Service() const override {
  1190. return TStringBuf(H_.Path).Skip(1);
  1191. }
  1192. const THttpHeaders& Headers() const override {
  1193. return P_->Headers();
  1194. }
  1195. TStringBuf Method() const override {
  1196. return H_.Method;
  1197. }
  1198. TStringBuf Body() const override {
  1199. return P_->DecodedContent();
  1200. }
  1201. TStringBuf Cgi() const override {
  1202. return H_.Cgi;
  1203. }
  1204. TStringBuf RequestId() const override {
  1205. return TStringBuf();
  1206. }
  1207. bool Canceled() const override {
  1208. if (!C_) {
  1209. return false;
  1210. }
  1211. return C_->IsCanceled();
  1212. }
  1213. void SendReply(TData& data) override {
  1214. SendReply(data, TString(), HttpCodes::HTTP_OK);
  1215. }
  1216. void SendReply(TData& data, const TString& headers, int httpCode) override {
  1217. if (!!C_) {
  1218. C_->Send(Id(), data, CompressionScheme_, P_->HttpVersion(), headers, httpCode);
  1219. C_.Reset();
  1220. }
  1221. }
  1222. void SendError(TResponseError err, const THttpErrorDetails& details) override {
  1223. static const unsigned errorToHttpCode[IRequest::MaxResponseError] =
  1224. {
  1225. 400,
  1226. 403,
  1227. 404,
  1228. 429,
  1229. 500,
  1230. 501,
  1231. 502,
  1232. 503,
  1233. 509};
  1234. if (!!C_) {
  1235. C_->SendError(Id(), errorToHttpCode[err], details.Details, P_->HttpVersion(), details.Headers);
  1236. C_.Reset();
  1237. }
  1238. }
  1239. static TAtomicBase NextId() {
  1240. static TAtomic idGenerator = 0;
  1241. TAtomicBase id = 0;
  1242. do {
  1243. id = AtomicIncrement(idGenerator);
  1244. } while (!id);
  1245. return id;
  1246. }
  1247. TSharedPtrB<TConn> C_;
  1248. TAutoPtr<THttpParser> P_;
  1249. TString RemoteHost_;
  1250. TString CompressionScheme_;
  1251. TParsedHttpFull H_;
  1252. TAtomicBase Id_ = NextId();
  1253. };
  1254. class TRequestGet: public TRequest {
  1255. public:
  1256. TRequestGet(TWeakPtrB<TConn>& c, TAutoPtr<THttpParser> p)
  1257. : TRequest(c, p)
  1258. {
  1259. }
  1260. TStringBuf Data() const override {
  1261. return H_.Cgi;
  1262. }
  1263. };
  1264. class TRequestPost: public TRequest {
  1265. public:
  1266. TRequestPost(TWeakPtrB<TConn>& c, TAutoPtr<THttpParser> p)
  1267. : TRequest(c, p)
  1268. {
  1269. }
  1270. TStringBuf Data() const override {
  1271. return P_->DecodedContent();
  1272. }
  1273. };
  1274. class TConn {
  1275. private:
  1276. TConn(THttpServer& hs, const TTcpSocketRef& s)
  1277. : HS_(hs)
  1278. , AS_(s)
  1279. , RemoteHost_(NNeh::PrintHostByRfc(*AS_->RemoteEndpoint().Addr()))
  1280. , BuffSize_(THttp2Options::InputBufferSize)
  1281. , Buff_(new char[BuffSize_])
  1282. , Canceled_(false)
  1283. , LeftRequestsToDisconnect_(hs.LimitRequestsPerConnection)
  1284. {
  1285. DBGOUT("THttpServer::TConn()");
  1286. HS_.OnCreateConn();
  1287. }
  1288. inline TConnRef SelfRef() noexcept {
  1289. return WeakThis_;
  1290. }
  1291. public:
  1292. static void Create(THttpServer& hs, const TTcpSocketRef& s) {
  1293. TSharedPtrB<TConn> conn(new TConn(hs, s));
  1294. conn->WeakThis_ = conn;
  1295. conn->ExpectNewRequest();
  1296. conn->AS_->AsyncPollRead(std::bind(&TConn::OnCanRead, conn, _1, _2), THttp2Options::ServerInputDeadline);
  1297. }
  1298. ~TConn() {
  1299. DBGOUT("~THttpServer::TConn(" << (!AS_ ? -666 : AS_->Native()) << ")");
  1300. HS_.OnDestroyConn();
  1301. }
  1302. private:
  1303. void ExpectNewRequest() {
  1304. P_.Reset(new THttpParser(THttpParser::Request));
  1305. P_->Prepare();
  1306. }
  1307. void OnCanRead(const TErrorCode& ec, IHandlingContext& ctx) {
  1308. if (ec) {
  1309. OnError();
  1310. } else {
  1311. TErrorCode ec2;
  1312. OnReadSome(ec2, AS_->ReadSome(Buff_.Get(), BuffSize_, ec2), ctx);
  1313. }
  1314. }
  1315. void OnError() {
  1316. DBGOUT("Srv OnError(" << (!AS_ ? -666 : AS_->Native()) << ")");
  1317. Canceled_ = true;
  1318. AS_->AsyncCancel();
  1319. }
  1320. void OnReadSome(const TErrorCode& ec, size_t amount, IHandlingContext& ctx) {
  1321. if (ec || !amount) {
  1322. OnError();
  1323. return;
  1324. }
  1325. DBGOUT("ReadSome(" << (!AS_ ? -666 : AS_->Native()) << "): " << amount);
  1326. try {
  1327. size_t buffPos = 0;
  1328. //DBGOUT("receive and parse: " << TStringBuf(Buff_.Get(), amount));
  1329. while (P_->Parse(Buff_.Get() + buffPos, amount - buffPos)) {
  1330. if (!P_->IsKeepAlive() || LeftRequestsToDisconnect_ == 1) {
  1331. SeenMessageWithoutKeepalive_ = true;
  1332. }
  1333. char rt = *P_->FirstLine().data();
  1334. const size_t extraDataSize = P_->GetExtraDataSize();
  1335. if (rt == 'P' || rt == 'p') {
  1336. OnRequest(new TRequestPost(WeakThis_, P_));
  1337. } else {
  1338. OnRequest(new TRequestGet(WeakThis_, P_));
  1339. }
  1340. if (extraDataSize) {
  1341. // has http pipelining
  1342. buffPos = amount - extraDataSize;
  1343. ExpectNewRequest();
  1344. } else {
  1345. ExpectNewRequest();
  1346. ctx.ContinueUseHandler(HS_.GetKeepAliveTimeout());
  1347. return;
  1348. }
  1349. }
  1350. ctx.ContinueUseHandler(THttp2Options::ServerInputDeadline);
  1351. } catch (...) {
  1352. OnError();
  1353. }
  1354. }
  1355. void OnRequest(TRequest* r) {
  1356. DBGOUT("OnRequest()");
  1357. if (AtomicGet(PrimaryResponse_)) {
  1358. // has pipelining
  1359. PipelineOrder_.Enqueue(r->Id());
  1360. } else {
  1361. AtomicSet(PrimaryResponse_, r->Id());
  1362. }
  1363. HS_.OnRequest(r);
  1364. OnRequestDone();
  1365. }
  1366. void OnRequestDone() {
  1367. DBGOUT("OnRequestDone()");
  1368. if (LeftRequestsToDisconnect_ > 0) {
  1369. --LeftRequestsToDisconnect_;
  1370. }
  1371. }
  1372. static void PrintHttpVersion(IOutputStream& out, const THttpVersion& ver) {
  1373. out << TStringBuf("HTTP/") << ver.Major << TStringBuf(".") << ver.Minor;
  1374. }
  1375. struct TResponseData : TThrRefBase {
  1376. TResponseData(size_t reqId, TTcpSocket::TSendedData data)
  1377. : RequestId_(reqId)
  1378. , Data_(data)
  1379. {
  1380. }
  1381. size_t RequestId_;
  1382. TTcpSocket::TSendedData Data_;
  1383. };
  1384. typedef TIntrusivePtr<TResponseData> TResponseDataRef;
  1385. public:
  1386. //called non thread-safe (from outside thread)
  1387. void Send(TAtomicBase requestId, TData& data, const TString& compressionScheme, const THttpVersion& ver, const TString& headers, int httpCode) {
  1388. class THttpResponseFormatter {
  1389. public:
  1390. THttpResponseFormatter(TData& theData, const TString& contentEncoding, const THttpVersion& theVer, const TString& theHeaders, int theHttpCode, bool closeConnection) {
  1391. Header.Reserve(128 + contentEncoding.size() + theHeaders.size());
  1392. PrintHttpVersion(Header, theVer);
  1393. Header << TStringBuf(" ") << theHttpCode << ' ' << HttpCodeStr(theHttpCode);
  1394. if (Compress(theData, contentEncoding)) {
  1395. Header << TStringBuf("\r\nContent-Encoding: ") << contentEncoding;
  1396. }
  1397. Header << TStringBuf("\r\nContent-Length: ") << theData.size();
  1398. if (closeConnection) {
  1399. Header << TStringBuf("\r\nConnection: close");
  1400. } else if (Y_LIKELY(theVer.Major > 1 || theVer.Minor > 0)) {
  1401. // since HTTP/1.1 Keep-Alive is default behaviour
  1402. Header << TStringBuf("\r\nConnection: Keep-Alive");
  1403. }
  1404. if (theHeaders) {
  1405. Header << theHeaders;
  1406. }
  1407. Header << TStringBuf("\r\n\r\n");
  1408. Body.swap(theData);
  1409. Parts[0].buf = Header.Data();
  1410. Parts[0].len = Header.Size();
  1411. Parts[1].buf = Body.data();
  1412. Parts[1].len = Body.size();
  1413. }
  1414. TStringStream Header;
  1415. TData Body;
  1416. IOutputStream::TPart Parts[2];
  1417. };
  1418. class TBuffers: public THttpResponseFormatter, public TTcpSocket::IBuffers {
  1419. public:
  1420. TBuffers(TData& theData, const TString& contentEncoding, const THttpVersion& theVer, const TString& theHeaders, int theHttpCode, bool closeConnection)
  1421. : THttpResponseFormatter(theData, contentEncoding, theVer, theHeaders, theHttpCode, closeConnection)
  1422. , IOVec(Parts, 2)
  1423. {
  1424. }
  1425. TContIOVector* GetIOvec() override {
  1426. return &IOVec;
  1427. }
  1428. TContIOVector IOVec;
  1429. };
  1430. TTcpSocket::TSendedData sd(new TBuffers(data, compressionScheme, ver, headers, httpCode, SeenMessageWithoutKeepalive_));
  1431. SendData(requestId, sd);
  1432. }
  1433. //called non thread-safe (from outside thread)
  1434. void SendError(TAtomicBase requestId, unsigned httpCode, const TString& descr, const THttpVersion& ver, const TString& headers) {
  1435. if (Canceled_) {
  1436. return;
  1437. }
  1438. class THttpErrorResponseFormatter {
  1439. public:
  1440. THttpErrorResponseFormatter(unsigned theHttpCode, const TString& theDescr, const THttpVersion& theVer, bool closeConnection, const TString& headers) {
  1441. PrintHttpVersion(Answer, theVer);
  1442. Answer << TStringBuf(" ") << theHttpCode << TStringBuf(" ");
  1443. if (theDescr.size() && !THttp2Options::ErrorDetailsAsResponseBody) {
  1444. // Reason-Phrase = *<TEXT, excluding CR, LF>
  1445. // replace bad chars to '.'
  1446. TString reasonPhrase = theDescr;
  1447. for (TString::iterator it = reasonPhrase.begin(); it != reasonPhrase.end(); ++it) {
  1448. char& ch = *it;
  1449. if (ch == ' ') {
  1450. continue;
  1451. }
  1452. if (((ch & 31) == ch) || static_cast<unsigned>(ch) == 127 || (static_cast<unsigned>(ch) & 0x80)) {
  1453. //CTLs || DEL(127) || non ascii
  1454. // (ch <= 32) || (ch >= 127)
  1455. ch = '.';
  1456. }
  1457. }
  1458. Answer << reasonPhrase;
  1459. } else {
  1460. Answer << HttpCodeStr(static_cast<int>(theHttpCode));
  1461. }
  1462. if (closeConnection) {
  1463. Answer << TStringBuf("\r\nConnection: close");
  1464. }
  1465. if (headers) {
  1466. Answer << "\r\n" << headers;
  1467. }
  1468. if (THttp2Options::ErrorDetailsAsResponseBody) {
  1469. Answer << TStringBuf("\r\nContent-Length:") << theDescr.size() << "\r\n\r\n" << theDescr;
  1470. } else {
  1471. Answer << "\r\n"
  1472. "Content-Length:0\r\n\r\n"sv;
  1473. }
  1474. Parts[0].buf = Answer.Data();
  1475. Parts[0].len = Answer.Size();
  1476. }
  1477. TStringStream Answer;
  1478. IOutputStream::TPart Parts[1];
  1479. };
  1480. class TBuffers: public THttpErrorResponseFormatter, public TTcpSocket::IBuffers {
  1481. public:
  1482. TBuffers(
  1483. unsigned theHttpCode,
  1484. const TString& theDescr,
  1485. const THttpVersion& theVer,
  1486. bool closeConnection,
  1487. const TString& headers
  1488. )
  1489. : THttpErrorResponseFormatter(theHttpCode, theDescr, theVer, closeConnection, headers)
  1490. , IOVec(Parts, 1)
  1491. {
  1492. }
  1493. TContIOVector* GetIOvec() override {
  1494. return &IOVec;
  1495. }
  1496. TContIOVector IOVec;
  1497. };
  1498. TTcpSocket::TSendedData sd(new TBuffers(httpCode, descr, ver, SeenMessageWithoutKeepalive_, headers));
  1499. SendData(requestId, sd);
  1500. }
  1501. void ProcessPipeline() {
  1502. // on successfull response to current (PrimaryResponse_) request
  1503. TAtomicBase requestId;
  1504. if (PipelineOrder_.Dequeue(&requestId)) {
  1505. TAtomicBase oldReqId;
  1506. do {
  1507. oldReqId = AtomicGet(PrimaryResponse_);
  1508. Y_ABORT_UNLESS(oldReqId, "race inside http pipelining");
  1509. } while (!AtomicCas(&PrimaryResponse_, requestId, oldReqId));
  1510. ProcessResponsesData();
  1511. } else {
  1512. TAtomicBase oldReqId = AtomicGet(PrimaryResponse_);
  1513. if (oldReqId) {
  1514. while (!AtomicCas(&PrimaryResponse_, 0, oldReqId)) {
  1515. Y_ABORT_UNLESS(oldReqId == AtomicGet(PrimaryResponse_), "race inside http pipelining [2]");
  1516. }
  1517. }
  1518. }
  1519. }
  1520. void ProcessResponsesData() {
  1521. // process responses data queue, send response (if already have next PrimaryResponse_)
  1522. TResponseDataRef rd;
  1523. while (ResponsesDataQueue_.Dequeue(&rd)) {
  1524. ResponsesData_[rd->RequestId_] = rd;
  1525. }
  1526. TAtomicBase requestId = AtomicGet(PrimaryResponse_);
  1527. if (requestId) {
  1528. THashMap<TAtomicBase, TResponseDataRef>::iterator it = ResponsesData_.find(requestId);
  1529. if (it != ResponsesData_.end()) {
  1530. // has next primary response
  1531. rd = it->second;
  1532. ResponsesData_.erase(it);
  1533. AS_->AsyncWrite(rd->Data_, std::bind(&TConn::OnSend, SelfRef(), _1, _2, _3), THttp2Options::ServerOutputDeadline);
  1534. }
  1535. }
  1536. }
  1537. private:
  1538. void SendData(TAtomicBase requestId, TTcpSocket::TSendedData sd) {
  1539. TContIOVector& vec = *sd->GetIOvec();
  1540. if (requestId != AtomicGet(PrimaryResponse_)) {
  1541. // already has another request for response first, so push this to queue
  1542. // + enqueue event for safe checking queue (at local/transport thread)
  1543. TResponseDataRef rdr = new TResponseData(requestId, sd);
  1544. ResponsesDataQueue_.Enqueue(rdr);
  1545. AS_->GetIOService().Post(std::bind(&TConn::ProcessResponsesData, SelfRef()));
  1546. return;
  1547. }
  1548. if (THttp2Options::ServerUseDirectWrite) {
  1549. vec.Proceed(AS_->WriteSome(vec));
  1550. }
  1551. if (!vec.Complete()) {
  1552. DBGOUT("AsyncWrite()");
  1553. AS_->AsyncWrite(sd, std::bind(&TConn::OnSend, SelfRef(), _1, _2, _3), THttp2Options::ServerOutputDeadline);
  1554. } else {
  1555. // run ProcessPipeline at safe thread
  1556. AS_->GetIOService().Post(std::bind(&TConn::ProcessPipeline, SelfRef()));
  1557. }
  1558. }
  1559. void OnSend(const TErrorCode& ec, size_t amount, IHandlingContext&) {
  1560. Y_UNUSED(amount);
  1561. if (ec) {
  1562. OnError();
  1563. } else {
  1564. ProcessPipeline();
  1565. }
  1566. if (SeenMessageWithoutKeepalive_) {
  1567. TErrorCode shutdown_ec;
  1568. AS_->Shutdown(TTcpSocket::ShutdownBoth, shutdown_ec);
  1569. }
  1570. }
  1571. public:
  1572. bool IsCanceled() const noexcept {
  1573. return Canceled_;
  1574. }
  1575. const TString& RemoteHost() const noexcept {
  1576. return RemoteHost_;
  1577. }
  1578. private:
  1579. TWeakPtrB<TConn> WeakThis_;
  1580. THttpServer& HS_;
  1581. TTcpSocketRef AS_;
  1582. TString RemoteHost_;
  1583. size_t BuffSize_;
  1584. TArrayHolder<char> Buff_;
  1585. TAutoPtr<THttpParser> P_;
  1586. // pipeline supporting
  1587. TAtomic PrimaryResponse_ = 0;
  1588. TLockFreeQueue<TAtomicBase> PipelineOrder_;
  1589. TLockFreeQueue<TResponseDataRef> ResponsesDataQueue_;
  1590. THashMap<TAtomicBase, TResponseDataRef> ResponsesData_;
  1591. TAtomicBool Canceled_;
  1592. TAtomicBool SeenMessageWithoutKeepalive_ = false;
  1593. i32 LeftRequestsToDisconnect_ = -1;
  1594. };
  1595. ///////////////////////////////////////////////////////////
  1596. public:
  1597. THttpServer(IOnRequest* cb, const TParsedLocation& loc)
  1598. : E_(THttp2Options::AsioServerThreads)
  1599. , CB_(cb)
  1600. , LimitRequestsPerConnection(THttp2Options::LimitRequestsPerConnection)
  1601. {
  1602. TNetworkAddress addr = THttp2Options::RespectHostInHttpServerNetworkAddress ?
  1603. TNetworkAddress(TString(loc.Host), loc.GetPort())
  1604. : TNetworkAddress(loc.GetPort());
  1605. for (TNetworkAddress::TIterator it = addr.Begin(); it != addr.End(); ++it) {
  1606. TEndpoint ep(new NAddr::TAddrInfo(&*it));
  1607. TTcpAcceptorPtr a(new TTcpAcceptor(AcceptExecutor_.GetIOService()));
  1608. DBGOUT("bind:" << ep.IpToString() << ":" << ep.Port());
  1609. a->Bind(ep);
  1610. a->Listen(THttp2Options::Backlog);
  1611. StartAccept(a.Get());
  1612. A_.push_back(a);
  1613. }
  1614. }
  1615. ~THttpServer() override {
  1616. AcceptExecutor_.SyncShutdown(); //cancel operation for all current sockets (include acceptors)
  1617. A_.clear(); //stop listening
  1618. E_.SyncShutdown();
  1619. }
  1620. void OnAccept(TTcpAcceptor* a, TAtomicSharedPtr<TTcpSocket> s, const TErrorCode& ec, IHandlingContext&) {
  1621. if (Y_UNLIKELY(ec)) {
  1622. if (ec.Value() == ECANCELED) {
  1623. return;
  1624. } else if (ec.Value() == EMFILE || ec.Value() == ENFILE || ec.Value() == ENOMEM || ec.Value() == ENOBUFS) {
  1625. //reach some os limit, suspend accepting
  1626. TAtomicSharedPtr<TDeadlineTimer> dt(new TDeadlineTimer(a->GetIOService()));
  1627. dt->AsyncWaitExpireAt(TDuration::Seconds(30), std::bind(&THttpServer::OnTimeoutSuspendAccept, this, a, dt, _1, _2));
  1628. return;
  1629. } else {
  1630. Cdbg << "acc: " << ec.Text() << Endl;
  1631. }
  1632. } else {
  1633. if (static_cast<size_t>(HttpInConnCounter()->Val()) < HttpInConnLimits()->Hard()) {
  1634. try {
  1635. SetNonBlock(s->Native());
  1636. PrepareSocket(s->Native());
  1637. TConn::Create(*this, s);
  1638. } catch (TSystemError& err) {
  1639. TErrorCode ec2(err.Status());
  1640. Cdbg << "acc: " << ec2.Text() << Endl;
  1641. }
  1642. } //else accepted socket will be closed
  1643. }
  1644. StartAccept(a); //continue accepting
  1645. }
  1646. void OnTimeoutSuspendAccept(TTcpAcceptor* a, TAtomicSharedPtr<TDeadlineTimer>, const TErrorCode& ec, IHandlingContext&) {
  1647. if (!ec) {
  1648. DBGOUT("resume acceptor")
  1649. StartAccept(a);
  1650. }
  1651. }
  1652. void OnRequest(IRequest* r) {
  1653. try {
  1654. CB_->OnRequest(r);
  1655. } catch (...) {
  1656. Cdbg << CurrentExceptionMessage() << Endl;
  1657. }
  1658. }
  1659. protected:
  1660. void OnCreateConn() noexcept {
  1661. HttpInConnCounter()->Inc();
  1662. }
  1663. void OnDestroyConn() noexcept {
  1664. HttpInConnCounter()->Dec();
  1665. }
  1666. TDuration GetKeepAliveTimeout() const noexcept {
  1667. size_t cc = HttpInConnCounter()->Val();
  1668. TFdLimits lim(*HttpInConnLimits());
  1669. if (!TFdLimits::ExceedLimit(cc, lim.Soft())) {
  1670. return THttp2Options::ServerInputDeadlineKeepAliveMax;
  1671. }
  1672. if (cc > lim.Hard()) {
  1673. cc = lim.Hard();
  1674. }
  1675. TDuration::TValue softTuneRange = THttp2Options::ServerInputDeadlineKeepAliveMax.Seconds() - THttp2Options::ServerInputDeadlineKeepAliveMin.Seconds();
  1676. return TDuration::Seconds((softTuneRange * (cc - lim.Soft())) / (lim.Hard() - lim.Soft() + 1)) + THttp2Options::ServerInputDeadlineKeepAliveMin;
  1677. }
  1678. private:
  1679. void StartAccept(TTcpAcceptor* a) {
  1680. TAtomicSharedPtr<TTcpSocket> s(new TTcpSocket(E_.Size() ? E_.GetExecutor().GetIOService() : AcceptExecutor_.GetIOService()));
  1681. a->AsyncAccept(*s, std::bind(&THttpServer::OnAccept, this, a, s, _1, _2));
  1682. }
  1683. TIOServiceExecutor AcceptExecutor_;
  1684. TVector<TTcpAcceptorPtr> A_;
  1685. TExecutorsPool E_;
  1686. IOnRequest* CB_;
  1687. public:
  1688. const i32 LimitRequestsPerConnection;
  1689. };
  1690. template <class T>
  1691. class THttp2Protocol: public IProtocol {
  1692. public:
  1693. IRequesterRef CreateRequester(IOnRequest* cb, const TParsedLocation& loc) override {
  1694. return new THttpServer(cb, loc);
  1695. }
  1696. THandleRef ScheduleRequest(const TMessage& msg, IOnRecv* fallback, TServiceStatRef& ss) override {
  1697. THttpRequest::THandleRef ret(new THttpRequest::THandle(fallback, msg, !ss ? nullptr : new TStatCollector(ss)));
  1698. try {
  1699. THttpRequest::Run(ret, msg, &T::Build, T::RequestSettings());
  1700. } catch (...) {
  1701. ret->ResetOnRecv();
  1702. throw;
  1703. }
  1704. return ret.Get();
  1705. }
  1706. TStringBuf Scheme() const noexcept override {
  1707. return T::Name();
  1708. }
  1709. bool SetOption(TStringBuf name, TStringBuf value) override {
  1710. return THttp2Options::Set(name, value);
  1711. }
  1712. };
  1713. }
  1714. namespace NNeh {
  1715. IProtocol* Http1Protocol() {
  1716. return Singleton<THttp2Protocol<TRequestGet1>>();
  1717. }
  1718. IProtocol* Post1Protocol() {
  1719. return Singleton<THttp2Protocol<TRequestPost1>>();
  1720. }
  1721. IProtocol* Full1Protocol() {
  1722. return Singleton<THttp2Protocol<TRequestFull1>>();
  1723. }
  1724. IProtocol* Http2Protocol() {
  1725. return Singleton<THttp2Protocol<TRequestGet2>>();
  1726. }
  1727. IProtocol* Post2Protocol() {
  1728. return Singleton<THttp2Protocol<TRequestPost2>>();
  1729. }
  1730. IProtocol* Full2Protocol() {
  1731. return Singleton<THttp2Protocol<TRequestFull2>>();
  1732. }
  1733. IProtocol* UnixSocketGetProtocol() {
  1734. return Singleton<THttp2Protocol<TRequestUnixSocketGet>>();
  1735. }
  1736. IProtocol* UnixSocketPostProtocol() {
  1737. return Singleton<THttp2Protocol<TRequestUnixSocketPost>>();
  1738. }
  1739. IProtocol* UnixSocketFullProtocol() {
  1740. return Singleton<THttp2Protocol<TRequestUnixSocketFull>>();
  1741. }
  1742. void SetHttp2OutputConnectionsLimits(size_t softLimit, size_t hardLimit) {
  1743. HttpConnManager()->SetLimits(softLimit, hardLimit);
  1744. }
  1745. void SetHttp2InputConnectionsLimits(size_t softLimit, size_t hardLimit) {
  1746. HttpInConnLimits()->SetSoft(softLimit);
  1747. HttpInConnLimits()->SetHard(hardLimit);
  1748. }
  1749. TAtomicBase GetHttpOutputConnectionCount() {
  1750. return HttpOutConnCounter()->Val();
  1751. }
  1752. std::pair<size_t, size_t> GetHttpOutputConnectionLimits() {
  1753. return HttpConnManager()->GetLimits();
  1754. }
  1755. TAtomicBase GetHttpInputConnectionCount() {
  1756. return HttpInConnCounter()->Val();
  1757. }
  1758. void SetHttp2InputConnectionsTimeouts(unsigned minSeconds, unsigned maxSeconds) {
  1759. THttp2Options::ServerInputDeadlineKeepAliveMin = TDuration::Seconds(minSeconds);
  1760. THttp2Options::ServerInputDeadlineKeepAliveMax = TDuration::Seconds(maxSeconds);
  1761. }
  1762. class TUnixSocketResolver {
  1763. public:
  1764. NDns::TResolvedHost* Resolve(const TString& path) {
  1765. TString unixSocketPath = path;
  1766. if (path.size() > 2 && path[0] == '[' && path[path.size() - 1] == ']') {
  1767. unixSocketPath = path.substr(1, path.size() - 2);
  1768. }
  1769. if (auto resolvedUnixSocket = ResolvedUnixSockets_.FindPtr(unixSocketPath)) {
  1770. return resolvedUnixSocket->Get();
  1771. }
  1772. TNetworkAddress na{TUnixSocketPath(unixSocketPath)};
  1773. ResolvedUnixSockets_[unixSocketPath] = MakeHolder<NDns::TResolvedHost>(unixSocketPath, na);
  1774. return ResolvedUnixSockets_[unixSocketPath].Get();
  1775. }
  1776. private:
  1777. THashMap<TString, THolder<NDns::TResolvedHost>> ResolvedUnixSockets_;
  1778. };
  1779. TUnixSocketResolver* UnixSocketResolver() {
  1780. return FastTlsSingleton<TUnixSocketResolver>();
  1781. }
  1782. const NDns::TResolvedHost* Resolve(const TStringBuf host, ui16 port, NHttp::EResolverType resolverType) {
  1783. if (resolverType == EResolverType::EUNIXSOCKET) {
  1784. return UnixSocketResolver()->Resolve(TString(host));
  1785. }
  1786. return NDns::CachedResolve(NDns::TResolveInfo(host, port));
  1787. }
  1788. }