http_ut.cpp 34 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022
  1. #include "http.h"
  2. #include "http_ex.h"
  3. #include <library/cpp/testing/unittest/registar.h>
  4. #include <library/cpp/testing/unittest/tests_data.h>
  5. #include <util/generic/cast.h>
  6. #include <util/stream/output.h>
  7. #include <util/stream/zlib.h>
  8. #include <util/system/datetime.h>
  9. #include <util/system/mutex.h>
  10. #include <util/random/random.h>
  11. Y_UNIT_TEST_SUITE(THttpServerTest) {
  12. class TEchoServer: public THttpServer::ICallBack {
  13. class TRequest: public THttpClientRequestEx {
  14. public:
  15. inline TRequest(TEchoServer* parent)
  16. : Parent_(parent)
  17. {
  18. }
  19. bool Reply(void* /*tsr*/) override {
  20. if (!ProcessHeaders()) {
  21. return true;
  22. }
  23. Output() << "HTTP/1.1 200 Ok\r\n\r\n";
  24. if (Buf.Size()) {
  25. Output().Write(Buf.AsCharPtr(), Buf.Size());
  26. } else {
  27. Output() << Parent_->Res_;
  28. }
  29. Output().Finish();
  30. return true;
  31. }
  32. private:
  33. TEchoServer* Parent_ = nullptr;
  34. };
  35. public:
  36. inline TEchoServer(const TString& res)
  37. : Res_(res)
  38. {
  39. }
  40. TClientRequest* CreateClient() override {
  41. return new TRequest(this);
  42. }
  43. private:
  44. TString Res_;
  45. };
  46. class TSleepingServer: public THttpServer::ICallBack {
  47. class TReplier: public TRequestReplier {
  48. public:
  49. inline TReplier(TSleepingServer* server)
  50. : Server(server)
  51. {
  52. }
  53. bool BeforeParseRequestOk(void*) override {
  54. if (Server->Ttl && (TInstant::Now() - CreateTime > TDuration::MilliSeconds(Server->Ttl))) {
  55. Output().Write("HTTP/1.0 503 Created\nX-Server: sleeping server\n\nTTL Exceed");
  56. return false;
  57. } else {
  58. return true;
  59. }
  60. }
  61. bool DoReply(const TReplyParams& params) override {
  62. ++Server->Replies;
  63. with_lock (Server->Lock) {
  64. params.Output.Write("HTTP/1.0 201 Created\nX-Server: sleeping server\n\nZoooo");
  65. params.Output.Finish();
  66. }
  67. return true;
  68. }
  69. using TClientRequest::Output;
  70. private:
  71. TSleepingServer* Server = nullptr;
  72. TInstant CreateTime = TInstant::Now();
  73. };
  74. public:
  75. TSleepingServer(size_t ttl = 0)
  76. : Ttl(ttl) {}
  77. TClientRequest* CreateClient() override {
  78. return new TReplier(this);
  79. }
  80. void OnMaxConn() override {
  81. ++MaxConns;
  82. }
  83. public:
  84. TMutex Lock;
  85. std::atomic<size_t> Replies;
  86. std::atomic<size_t> MaxConns;
  87. size_t Ttl;
  88. };
  89. static const TString CrLf = "\r\n";
  90. struct TTestRequest {
  91. TTestRequest(ui16 port, TString content = TString())
  92. : Port(port)
  93. , Content(std::move(content))
  94. {
  95. }
  96. void CheckContinue(TSocketInput& si) {
  97. if (Expect100Continue) {
  98. TStringStream ss;
  99. TString firstLine;
  100. si.ReadLine(firstLine);
  101. for (;;) {
  102. TString buf;
  103. si.ReadLine(buf);
  104. if (buf.size() == 0) {
  105. break;
  106. }
  107. ss << buf << CrLf;
  108. }
  109. UNIT_ASSERT_EQUAL(firstLine, "HTTP/1.1 100 Continue");
  110. }
  111. }
  112. TString Execute() {
  113. TSocket* s = nullptr;
  114. THolder<TSocket> singleReqSocket;
  115. if (KeepAliveConnection) {
  116. if (!KeepAlivedSocket) {
  117. KeepAlivedSocket = MakeHolder<TSocket>(TNetworkAddress("localhost", Port), TDuration::Seconds(10));
  118. }
  119. s = KeepAlivedSocket.Get();
  120. } else {
  121. TNetworkAddress addr("localhost", Port);
  122. singleReqSocket.Reset(new TSocket(addr, TDuration::Seconds(10)));
  123. s = singleReqSocket.Get();
  124. }
  125. bool isPost = Type == "POST";
  126. TSocketInput si(*s);
  127. if (UseHttpOutput) {
  128. TSocketOutput so(*s);
  129. THttpOutput output(&so);
  130. output.EnableKeepAlive(KeepAliveConnection);
  131. output.EnableCompression(EnableResponseEncoding);
  132. TStringStream r;
  133. r << Type << " / HTTP/1.1" << CrLf;
  134. r << "Host: localhost:" + ToString(Port) << CrLf;
  135. if (isPost) {
  136. if (ContentEncoding.size()) {
  137. r << "Content-Encoding: " << ContentEncoding << CrLf;
  138. } else {
  139. r << "Transfer-Encoding: chunked" << CrLf;
  140. }
  141. if (Expect100Continue) {
  142. r << "Expect: 100-continue" << CrLf;
  143. }
  144. }
  145. r << CrLf;
  146. if (isPost) {
  147. output.Write(r.Str());
  148. output.Flush();
  149. CheckContinue(si);
  150. output.Write(Content);
  151. output.Finish();
  152. } else {
  153. output.Write(r.Str());
  154. output.Finish();
  155. }
  156. } else {
  157. TStringStream r;
  158. r << Type << " / HTTP/1.1" << CrLf;
  159. r << "Host: localhost:" + ToString(Port) << CrLf;
  160. if (KeepAliveConnection) {
  161. r << "Connection: Keep-Alive" << CrLf;
  162. } else {
  163. r << "Connection: Close" << CrLf;
  164. }
  165. if (EnableResponseEncoding) {
  166. r << "Accept-Encoding: gzip, deflate, x-gzip, x-deflate, y-lzo, y-lzf, y-lzq, y-bzip2, y-lzma" << CrLf;
  167. }
  168. if (isPost && Expect100Continue) {
  169. r << "Expect: 100-continue" << CrLf;
  170. }
  171. if (isPost && ContentEncoding.size() && Content.size()) {
  172. r << "Content-Encoding: " << ContentEncoding << CrLf;
  173. TStringStream compressedContent;
  174. {
  175. TZLibCompress zlib(&compressedContent);
  176. zlib.Write(Content.data(), Content.size());
  177. zlib.Flush();
  178. zlib.Finish();
  179. }
  180. r << "Content-Length: " << compressedContent.Size() << CrLf;
  181. r << CrLf;
  182. s->Send(r.Data(), r.Size());
  183. CheckContinue(si);
  184. Hdr = r.Str();
  185. TString tosend = compressedContent.Str();
  186. s->Send(tosend.data(), tosend.size());
  187. } else {
  188. if (isPost) {
  189. r << "Content-Length: " << Content.size() << CrLf;
  190. r << CrLf;
  191. s->Send(r.Data(), r.Size());
  192. CheckContinue(si);
  193. Hdr = r.Str();
  194. s->Send(Content.data(), Content.size());
  195. } else {
  196. r << CrLf;
  197. Hdr = r.Str();
  198. s->Send(r.Data(), r.Size());
  199. }
  200. }
  201. }
  202. THttpInput input(&si);
  203. TStringStream ss;
  204. TransferData(&input, &ss);
  205. return ss.Str();
  206. }
  207. TString GetDescription() const {
  208. if (UseHttpOutput) {
  209. TStringStream ss;
  210. ss << (KeepAliveConnection ? "keep-alive " : "") << Type;
  211. if (ContentEncoding.size()) {
  212. ss << " with encoding=" << ContentEncoding;
  213. }
  214. return ss.Str();
  215. } else {
  216. return Hdr;
  217. }
  218. }
  219. ui16 Port = 0;
  220. bool UseHttpOutput = true;
  221. TString Type = "GET";
  222. TString ContentEncoding;
  223. TString Content;
  224. bool KeepAliveConnection = false;
  225. THolder<TSocket> KeepAlivedSocket;
  226. bool EnableResponseEncoding = false;
  227. TString Hdr;
  228. bool Expect100Continue = false;
  229. };
  230. class TFailingMtpQueue: public TSimpleThreadPool {
  231. private:
  232. bool FailOnAdd_ = false;
  233. public:
  234. void SetFailOnAdd(bool fail = true) {
  235. FailOnAdd_ = fail;
  236. }
  237. [[nodiscard]] bool Add(IObjectInQueue* pObj) override {
  238. if (FailOnAdd_) {
  239. return false;
  240. }
  241. return TSimpleThreadPool::Add(pObj);
  242. }
  243. TFailingMtpQueue() = default;
  244. TFailingMtpQueue(IThreadFactory* pool)
  245. : TSimpleThreadPool(pool)
  246. {
  247. }
  248. };
  249. TString TestData(size_t size = 5 * 4096) {
  250. TString res;
  251. for (size_t i = 0; i < size; ++i) {
  252. res += (char)i;
  253. }
  254. return res;
  255. }
  256. Y_UNIT_TEST(TestEchoServer) {
  257. TString res = TestData();
  258. TPortManager pm;
  259. const ui16 port = pm.GetPort();
  260. const bool trueFalse[] = {true, false};
  261. TEchoServer serverImpl(res);
  262. THttpServer server(&serverImpl, THttpServer::TOptions(port).EnableKeepAlive(true).EnableCompression(true));
  263. for (int i = 0; i < 2; ++i) {
  264. UNIT_ASSERT(server.Start());
  265. TTestRequest r(port);
  266. r.Content = res;
  267. for (bool keepAlive : trueFalse) {
  268. r.KeepAliveConnection = keepAlive;
  269. // THttpOutput use chunked stream, else use Content-Length
  270. for (bool useHttpOutput : trueFalse) {
  271. r.UseHttpOutput = useHttpOutput;
  272. for (bool enableResponseEncoding : trueFalse) {
  273. r.EnableResponseEncoding = enableResponseEncoding;
  274. const TString reqTypes[] = {"GET", "POST"};
  275. for (const TString& reqType : reqTypes) {
  276. r.Type = reqType;
  277. const TString encoders[] = {"", "deflate"};
  278. for (const TString& encoder : encoders) {
  279. r.ContentEncoding = encoder;
  280. for (bool expect100Continue : trueFalse) {
  281. r.Expect100Continue = expect100Continue;
  282. TString resp = r.Execute();
  283. UNIT_ASSERT_C(resp == res, "diff echo response for request:\n" + r.GetDescription());
  284. }
  285. }
  286. }
  287. }
  288. }
  289. }
  290. server.Stop();
  291. }
  292. }
  293. Y_UNIT_TEST(TestReusePortEnabled) {
  294. if (!IsReusePortAvailable()) {
  295. return; // skip test
  296. }
  297. TString res = TestData();
  298. TPortManager pm;
  299. const ui16 port = pm.GetPort();
  300. TEchoServer serverImpl(res);
  301. TVector<THolder<THttpServer>> servers;
  302. for (ui32 i = 0; i < 10; i++) {
  303. servers.push_back(MakeHolder<THttpServer>(&serverImpl, THttpServer::TOptions(port).EnableReusePort(true)));
  304. }
  305. for (ui32 testRun = 0; testRun < 3; testRun++) {
  306. for (auto& server : servers) {
  307. // start servers one at a time and check at least one of them is replying
  308. UNIT_ASSERT(server->Start());
  309. TTestRequest r(port, res);
  310. UNIT_ASSERT_C(r.Execute() == res, "diff echo response for request:\n" + r.GetDescription());
  311. }
  312. for (auto& server : servers) {
  313. // ping servers and stop them one at a time
  314. // at the last iteration only one server is still working and then gets stopped as well
  315. TTestRequest r(port, res);
  316. UNIT_ASSERT_C(r.Execute() == res, "diff echo response for request:\n" + r.GetDescription());
  317. server->Stop();
  318. }
  319. }
  320. }
  321. Y_UNIT_TEST(TestReusePortDisabled) {
  322. // check that with the ReusePort option disabled it's impossible to start two servers on the same port
  323. // check that ReusePort option is disabled by default (don't set it explicitly in the test)
  324. TPortManager pm;
  325. const ui16 port = pm.GetPort();
  326. TEchoServer serverImpl(TString{});
  327. THttpServer server1(&serverImpl, THttpServer::TOptions(port));
  328. THttpServer server2(&serverImpl, THttpServer::TOptions(port));
  329. UNIT_ASSERT(true == server1.Start());
  330. UNIT_ASSERT(false == server2.Start());
  331. server1.Stop();
  332. // Stop() is a sync call, port should be free by now
  333. UNIT_ASSERT(true == server2.Start());
  334. UNIT_ASSERT(false == server1.Start());
  335. }
  336. Y_UNIT_TEST(TestFailServer) {
  337. /**
  338. * Emulate request processing failures
  339. * Data should be large enough not to fit into socket buffer
  340. **/
  341. TString res = TestData(10 * 1024 * 1024);
  342. TPortManager portManager;
  343. const ui16 port = portManager.GetPort();
  344. TEchoServer serverImpl(res);
  345. THttpServer::TOptions options(port);
  346. options.EnableKeepAlive(true);
  347. options.EnableCompression(true);
  348. using TFailingServerMtpQueue = TThreadPoolBinder<TFailingMtpQueue, THttpServer::ICallBack>;
  349. THttpServer::TMtpQueueRef mainWorkers = new TFailingServerMtpQueue(&serverImpl, SystemThreadFactory());
  350. THttpServer::TMtpQueueRef failWorkers = new TThreadPool(SystemThreadFactory());
  351. THttpServer server(&serverImpl, mainWorkers, failWorkers, options);
  352. UNIT_ASSERT(server.Start());
  353. for (size_t i = 0; i < 3; ++i) {
  354. // should fail on 2nd request
  355. static_cast<TFailingMtpQueue*>(mainWorkers.Get())->SetFailOnAdd(i == 1);
  356. TTestRequest r(port);
  357. r.Content = res;
  358. r.Type = "POST";
  359. TString resp = r.Execute();
  360. if (i == 1) {
  361. UNIT_ASSERT(resp.Contains("Service Unavailable"));
  362. } else {
  363. UNIT_ASSERT_C(resp == res, "diff echo response for request:\n" + r.GetDescription());
  364. }
  365. }
  366. server.Stop();
  367. }
  368. class TReleaseConnectionServer: public THttpServer::ICallBack {
  369. class TRequest: public THttpClientRequestEx {
  370. public:
  371. bool Reply(void* /*tsr*/) override {
  372. Output() << "HTTP/1.1 200 Ok\r\n\r\n";
  373. Output() << "reply";
  374. Output().Finish();
  375. ReleaseConnection();
  376. throw yexception() << "some error";
  377. return true;
  378. }
  379. };
  380. public:
  381. TClientRequest* CreateClient() override {
  382. return new TRequest();
  383. }
  384. void OnException() override {
  385. ExceptionMessage = CurrentExceptionMessage();
  386. }
  387. TString ExceptionMessage;
  388. };
  389. class TResetConnectionServer: public THttpServer::ICallBack {
  390. class TRequest: public TClientRequest {
  391. public:
  392. bool Reply(void* /*tsr*/) override {
  393. Output() << "HTTP/1.1";
  394. ResetConnection();
  395. return true;
  396. }
  397. };
  398. public:
  399. TClientRequest* CreateClient() override {
  400. return new TRequest();
  401. }
  402. void OnException() override {
  403. ExceptionMessage = CurrentExceptionMessage();
  404. }
  405. TString ExceptionMessage;
  406. };
  407. class TListenerSockAddrReplyServer: public THttpServer::ICallBack {
  408. class TRequest: public TClientRequest {
  409. public:
  410. bool Reply(void* /*tsr*/) override {
  411. Output() << "HTTP/1.1 200 Ok\r\n\r\n";
  412. Output() << PrintHostAndPort(*GetListenerSockAddrRef());
  413. Output().Finish();
  414. return true;
  415. }
  416. };
  417. public:
  418. TClientRequest* CreateClient() override {
  419. return new TRequest();
  420. }
  421. };
  422. Y_UNIT_TEST(TTestResetConnection) {
  423. TPortManager pm;
  424. const ui16 port = pm.GetPort();
  425. TResetConnectionServer serverImpl;
  426. THttpServer server(&serverImpl, THttpServer::TOptions(port));
  427. UNIT_ASSERT(server.Start());
  428. TTestRequest r(port, "request");
  429. UNIT_ASSERT_EXCEPTION_CONTAINS(r.Execute(), TSystemError, "Connection reset by peer");
  430. server.Stop();
  431. }
  432. Y_UNIT_TEST(TTestReleaseConnection) {
  433. TPortManager pm;
  434. const ui16 port = pm.GetPort();
  435. TReleaseConnectionServer serverImpl;
  436. THttpServer server(&serverImpl, THttpServer::TOptions(port).EnableKeepAlive(true));
  437. UNIT_ASSERT(server.Start());
  438. TTestRequest r(port, "request");
  439. r.KeepAliveConnection = true;
  440. UNIT_ASSERT_C(r.Execute() == "reply", "diff echo response for request:\n" + r.GetDescription());
  441. server.Stop();
  442. UNIT_ASSERT_STRINGS_EQUAL(serverImpl.ExceptionMessage, "(yexception) some error");
  443. }
  444. THttpInput SendRequest(TSocket& socket, ui16 port) {
  445. TSocketInput si(socket);
  446. TSocketOutput so(socket);
  447. THttpOutput out(&so);
  448. out.EnableKeepAlive(true);
  449. out << "GET / HTTP/1.1" << CrLf;
  450. out << "Host: localhost:" + ToString(port) << CrLf;
  451. out << CrLf;
  452. out.Flush();
  453. THttpInput input(&si);
  454. input.ReadAll();
  455. return input;
  456. }
  457. THttpInput SendRequestWithBody(TSocket& socket, ui16 port, TString body) {
  458. TSocketInput si(socket);
  459. TSocketOutput so(socket);
  460. THttpOutput out(&so);
  461. out << "POST / HTTP/1.1" << CrLf;
  462. out << "Host: localhost:" + ToString(port) << CrLf;
  463. out << "Content-Length: " + ToString(body.size()) << CrLf;
  464. out << CrLf;
  465. out << body;
  466. out.Flush();
  467. THttpInput input(&si);
  468. input.ReadAll();
  469. return input;
  470. }
  471. Y_UNIT_TEST(TTestExpirationTimeout) {
  472. TPortManager pm;
  473. const ui16 port = pm.GetPort();
  474. TEchoServer serverImpl("test_data");
  475. THttpServer::TOptions options(port);
  476. options.nThreads = 1;
  477. options.MaxQueueSize = 0;
  478. options.MaxConnections = 0;
  479. options.KeepAliveEnabled = true;
  480. options.ExpirationTimeout = TDuration::Seconds(1);
  481. options.PollTimeout = TDuration::MilliSeconds(100);
  482. THttpServer server(&serverImpl, options);
  483. UNIT_ASSERT(server.Start());
  484. TSocket socket(TNetworkAddress("localhost", port), TDuration::Seconds(10));
  485. SendRequest(socket, port);
  486. SendRequest(socket, port);
  487. Sleep(TDuration::Seconds(5));
  488. UNIT_ASSERT_EXCEPTION(SendRequest(socket, port), THttpReadException);
  489. server.Stop();
  490. }
  491. Y_UNIT_TEST(TTestContentLengthTooLarge) {
  492. TPortManager pm;
  493. const ui16 port = pm.GetPort();
  494. TEchoServer serverImpl("test_data");
  495. THttpServer::TOptions options(port);
  496. options.nThreads = 1;
  497. options.MaxQueueSize = 0;
  498. options.MaxInputContentLength = 2_KB;
  499. options.MaxConnections = 0;
  500. options.KeepAliveEnabled = false;
  501. options.ExpirationTimeout = TDuration::Seconds(1);
  502. options.PollTimeout = TDuration::MilliSeconds(100);
  503. THttpServer server(&serverImpl, options);
  504. UNIT_ASSERT(server.Start());
  505. TSocket socket(TNetworkAddress("localhost", port), TDuration::Seconds(5));
  506. UNIT_ASSERT_STRING_CONTAINS(SendRequestWithBody(socket, port, TString(1_KB, 'a')).FirstLine(), "HTTP/1.1 200 Ok");
  507. TSocket socket2(TNetworkAddress("localhost", port), TDuration::Seconds(5));
  508. UNIT_ASSERT_STRING_CONTAINS(SendRequestWithBody(socket2, port, TString(10_KB, 'a')).FirstLine(), "HTTP/1.1 413 Payload Too Large");
  509. server.Stop();
  510. }
  511. Y_UNIT_TEST(TTestNullInRequest) {
  512. TPortManager pm;
  513. const ui16 port = pm.GetPort();
  514. TEchoServer serverImpl("test_data");
  515. THttpServer::TOptions options(port);
  516. options.nThreads = 1;
  517. options.MaxQueueSize = 0;
  518. options.MaxConnections = 0;
  519. options.KeepAliveEnabled = false;
  520. options.ExpirationTimeout = TDuration::Seconds(1);
  521. options.PollTimeout = TDuration::MilliSeconds(100);
  522. THttpServer server(&serverImpl, options);
  523. UNIT_ASSERT(server.Start());
  524. TSocket socket(TNetworkAddress("localhost", port), TDuration::Seconds(5));
  525. TSocketInput si(socket);
  526. TSocketOutput so(socket);
  527. THttpOutput out(&so);
  528. out << "GET \0/ggg HTTP/1.1" << CrLf;
  529. out << "Host: localhost:" + ToString(port) << CrLf;
  530. out << CrLf;
  531. out.Flush();
  532. THttpInput input(&si);
  533. input.ReadAll();
  534. UNIT_ASSERT_STRING_CONTAINS(input.FirstLine(), "HTTP/1.1 4");
  535. server.Stop();
  536. }
  537. Y_UNIT_TEST(TTestCloseConnectionOnRequestLimit) {
  538. TPortManager pm;
  539. const ui16 port = pm.GetPort();
  540. TEchoServer serverImpl("test_data");
  541. THttpServer server(&serverImpl, THttpServer::TOptions(port).EnableKeepAlive(true).SetMaxRequestsPerConnection(2));
  542. UNIT_ASSERT(server.Start());
  543. TSocket socket(TNetworkAddress("localhost", port), TDuration::Seconds(10));
  544. UNIT_ASSERT(SendRequest(socket, port).IsKeepAlive());
  545. UNIT_ASSERT(!SendRequest(socket, port).IsKeepAlive());
  546. UNIT_ASSERT_EXCEPTION(SendRequest(socket, port), THttpReadException);
  547. server.Stop();
  548. }
  549. Y_UNIT_TEST(TTestListenerSockAddrConnection) {
  550. TPortManager pm;
  551. const ui16 port1 = pm.GetPort();
  552. const ui16 port2 = pm.GetPort();
  553. TListenerSockAddrReplyServer serverImpl;
  554. THttpServer server(&serverImpl, THttpServer::TOptions().EnableKeepAlive(true).AddBindAddress("127.0.0.1", port1).AddBindAddress("127.0.0.1", port2));
  555. UNIT_ASSERT(server.Start());
  556. TTestRequest r1(port1);
  557. r1.KeepAliveConnection = true;
  558. TString resp = r1.Execute();
  559. UNIT_ASSERT(resp == TString::Join("127.0.0.1", ":", ToString(port1)));
  560. TTestRequest r2(port2);
  561. r2.KeepAliveConnection = true;
  562. resp = r2.Execute();
  563. UNIT_ASSERT(resp == TString::Join("127.0.0.1", ":", ToString(port2)));
  564. server.Stop();
  565. }
  566. Y_UNIT_TEST(TestSocketsLeak) {
  567. TPortManager portManager;
  568. TString res = TestData(25);
  569. const bool trueFalse[] = {true, false};
  570. for (bool rejectExcessConnections : trueFalse) {
  571. for (bool keepAlive : trueFalse) {
  572. const ui16 port = portManager.GetPort();
  573. TSleepingServer server;
  574. THttpServer::TOptions options(port);
  575. options.nThreads = 1;
  576. options.MaxConnections = 1;
  577. options.MaxQueueSize = 10;
  578. options.MaxFQueueSize = 2;
  579. options.nFThreads = 2;
  580. options.KeepAliveEnabled = true;
  581. options.RejectExcessConnections = rejectExcessConnections;
  582. THttpServer srv(&server, options);
  583. UNIT_ASSERT(srv.Start());
  584. UNIT_ASSERT(server.Lock.TryAcquire());
  585. std::atomic<size_t> threadsFinished = 0;
  586. TVector<THolder<IThreadFactory::IThread>> threads;
  587. auto func = [port, keepAlive, &threadsFinished]() {
  588. try {
  589. TTestRequest r(port);
  590. r.KeepAliveConnection = keepAlive;
  591. r.Execute();
  592. } catch (...) {
  593. }
  594. ++threadsFinished;
  595. };
  596. threads.push_back(SystemThreadFactory()->Run(func));
  597. while (server.Replies.load() != 1) { //wait while we have one connection inside server
  598. Sleep(TDuration::MilliSeconds(1));
  599. }
  600. for (size_t i = 1; i < 3; ++i) {
  601. threads.push_back(SystemThreadFactory()->Run(func));
  602. //in case of rejectExcessConnections next requests will fail, otherwise will stuck inside server queue
  603. while ((rejectExcessConnections ? threadsFinished.load() : srv.GetRequestQueueSize()) != i) {
  604. Sleep(TDuration::MilliSeconds(1));
  605. }
  606. }
  607. server.Lock.Release();
  608. for (auto&& thread : threads) {
  609. thread->Join();
  610. }
  611. TStringStream opts;
  612. opts << " [" << rejectExcessConnections << ", " << keepAlive << "] ";
  613. UNIT_ASSERT_EQUAL_C(server.MaxConns, 2, opts.Str() + "we should get MaxConn notification 2 times, got " + ToString(server.MaxConns.load()));
  614. if (rejectExcessConnections) {
  615. UNIT_ASSERT_EQUAL_C(server.Replies, 1, opts.Str() + "only one request should have been processed, got " + ToString(server.Replies.load()));
  616. } else {
  617. UNIT_ASSERT_VALUES_EQUAL(server.Replies.load(), 3);
  618. }
  619. }
  620. }
  621. }
  622. class TShooter {
  623. public:
  624. struct TCounters {
  625. public:
  626. TCounters() = default;
  627. TCounters(const TCounters& other)
  628. : Fail(other.Fail.load())
  629. , Success(other.Success.load())
  630. {
  631. }
  632. public:
  633. std::atomic<size_t> Fail = 0;
  634. std::atomic<size_t> Success = 0;
  635. };
  636. public:
  637. TShooter(size_t threadCount, ui16 port)
  638. : Counters_(threadCount)
  639. {
  640. for (size_t i = 0; i < threadCount; ++i) {
  641. auto func = [i, port, this] () {
  642. for (;;) {
  643. try {
  644. TTestRequest r(port);
  645. r.KeepAliveConnection = true;
  646. for (size_t j = 0; j < 100; ++j) {
  647. if (Stopped_.load()) {
  648. return;
  649. }
  650. r.Execute();
  651. Sleep(TDuration::MilliSeconds(1) * RandomNumber<float>());
  652. Counters_[i].Success++;
  653. }
  654. } catch (TSystemError& e) {
  655. UNIT_ASSERT_C(e.Status() == ECONNRESET || e.Status() == ECONNREFUSED, CurrentExceptionMessage());
  656. Counters_[i].Fail++;
  657. } catch (THttpReadException&) {
  658. Counters_[i].Fail++;
  659. } catch (...) {
  660. UNIT_ASSERT_C(false, CurrentExceptionMessage());
  661. }
  662. }
  663. };
  664. Threads_.push_back(SystemThreadFactory()->Run(func));
  665. }
  666. }
  667. void Stop() {
  668. Stopped_.store(true);
  669. for (auto& thread : Threads_) {
  670. thread->Join();
  671. }
  672. }
  673. void WaitProgress() const {
  674. auto snapshot = Counters_;
  675. for (;;) {
  676. size_t haveProgress = 0;
  677. for (size_t i = 0; i < Counters_.size(); ++i) {
  678. haveProgress += (Counters_[i].Fail.load() + Counters_[i].Success.load()) > (snapshot[i].Fail + snapshot[i].Success);
  679. }
  680. if (haveProgress == Counters_.size()) {
  681. return;
  682. }
  683. Sleep(TDuration::MilliSeconds(1));
  684. }
  685. }
  686. const auto& GetCounters() const {
  687. return Counters_;
  688. }
  689. ~TShooter() {
  690. Stop();
  691. }
  692. private:
  693. TVector<THolder<IThreadFactory::IThread>> Threads_;
  694. std::atomic<bool> Stopped_ = false;
  695. TVector<TCounters> Counters_;
  696. };
  697. struct TTestConfig {
  698. bool OneShot = false;
  699. ui32 ListenerThreads = 1;
  700. };
  701. TVector<TTestConfig> testConfigs = {
  702. {.OneShot = false, .ListenerThreads = 1},
  703. {.OneShot = true, .ListenerThreads = 1},
  704. {.OneShot = true, .ListenerThreads = 4},
  705. {.OneShot = true, .ListenerThreads = 63},
  706. };
  707. THttpServer::TOptions ApplyConfig(const THttpServer::TOptions& opts, const TTestConfig& cfg) {
  708. THttpServer::TOptions res = opts;
  709. res.OneShotPoll = cfg.OneShot;
  710. res.nListenerThreads = cfg.ListenerThreads;
  711. return res;
  712. }
  713. Y_UNIT_TEST(TestStartStop) {
  714. TPortManager pm;
  715. const ui16 port = pm.GetPort();
  716. const size_t threadCount = 5;
  717. TShooter shooter(threadCount, port);
  718. TString res = TestData();
  719. for (const auto& cfg : testConfigs) {
  720. TEchoServer serverImpl(res);
  721. THttpServer server(&serverImpl, ApplyConfig(THttpServer::TOptions(port).EnableKeepAlive(true), cfg));
  722. for (size_t i = 0; i < 100; ++i) {
  723. UNIT_ASSERT(server.Start());
  724. shooter.WaitProgress();
  725. {
  726. auto before = shooter.GetCounters();
  727. shooter.WaitProgress();
  728. auto after = shooter.GetCounters();
  729. for (size_t i = 0; i < before.size(); ++i) {
  730. UNIT_ASSERT(before[i].Success < after[i].Success);
  731. UNIT_ASSERT(before[i].Fail == after[i].Fail);
  732. }
  733. }
  734. server.Stop();
  735. shooter.WaitProgress();
  736. {
  737. auto before = shooter.GetCounters();
  738. shooter.WaitProgress();
  739. auto after = shooter.GetCounters();
  740. for (size_t i = 0; i < before.size(); ++i) {
  741. UNIT_ASSERT(before[i].Success == after[i].Success);
  742. UNIT_ASSERT(before[i].Fail < after[i].Fail);
  743. }
  744. }
  745. }
  746. }
  747. }
  748. Y_UNIT_TEST(TestMaxConnections) {
  749. class TMaxConnServer
  750. : public TEchoServer
  751. {
  752. public:
  753. using TEchoServer::TEchoServer;
  754. void OnMaxConn() override {
  755. ++MaxConns;
  756. }
  757. public:
  758. std::atomic<size_t> MaxConns = 0;
  759. };
  760. TPortManager pm;
  761. const ui16 port = pm.GetPort();
  762. const size_t maxConnections = 5;
  763. TString res = TestData();
  764. for (const auto& cfg : testConfigs) {
  765. TMaxConnServer serverImpl(res);
  766. THttpServer server(&serverImpl, ApplyConfig(THttpServer::TOptions(port).EnableKeepAlive(true).SetMaxConnections(maxConnections), cfg));
  767. UNIT_ASSERT(server.Start());
  768. TShooter shooter(maxConnections + 1, port);
  769. for (size_t i = 0; i < 100; ++i) {
  770. const size_t prev = serverImpl.MaxConns.load();
  771. while (serverImpl.MaxConns.load() < prev + 100) {
  772. Sleep(TDuration::MilliSeconds(1));
  773. }
  774. }
  775. shooter.Stop();
  776. server.Stop();
  777. for (const auto& c : shooter.GetCounters()) {
  778. UNIT_ASSERT(c.Success > 0);
  779. UNIT_ASSERT(c.Fail > 0);
  780. UNIT_ASSERT(c.Success > c.Fail);
  781. }
  782. }
  783. }
  784. Y_UNIT_TEST(StartFail) {
  785. TString res = TestData();
  786. TEchoServer serverImpl(res);
  787. {
  788. THttpServer server(&serverImpl, THttpServer::TOptions(1));
  789. UNIT_ASSERT(!server.GetErrorCode());
  790. UNIT_ASSERT(!server.Start());
  791. UNIT_ASSERT(server.GetErrorCode());
  792. }
  793. {
  794. TPortManager pm;
  795. const ui16 port = pm.GetPort();
  796. THttpServer server1(&serverImpl, THttpServer::TOptions(port));
  797. UNIT_ASSERT(server1.Start());
  798. UNIT_ASSERT(!server1.GetErrorCode());
  799. THttpServer server2(&serverImpl, THttpServer::TOptions(port));
  800. UNIT_ASSERT(!server2.Start());
  801. UNIT_ASSERT(server2.GetErrorCode());
  802. }
  803. }
  804. inline TString ToString(const THashSet<TString>& hs) {
  805. TString res = "";
  806. for (auto s : hs) {
  807. if (res) {
  808. res.append(",");
  809. }
  810. res.append("\"").append(s).append("\"");
  811. }
  812. return res;
  813. }
  814. Y_UNIT_TEST(TestTTLExceed) {
  815. // Checks that one of request returns "TTL Exceed"
  816. // First request waits for server.Lock.Release() for one threaded TSleepingServer
  817. // So second request in queue should fail with TTL Exceed, because fist one lock thread pool for (ttl + 1) ms
  818. TPortManager portManager;
  819. const ui16 port = portManager.GetPort();
  820. TString res = TestData(25);
  821. const size_t ttl = 10;
  822. TSleepingServer server{ttl};
  823. THttpServer::TOptions options(port);
  824. options.nThreads = 1;
  825. options.MaxConnections = 2;
  826. THttpServer srv(&server, options);
  827. UNIT_ASSERT(srv.Start());
  828. UNIT_ASSERT(server.Lock.TryAcquire());
  829. THashSet<TString> results;
  830. TMutex resultLock;
  831. auto func = [port, &resultLock, &results]() {
  832. try {
  833. TTestRequest r(port);
  834. TString result = r.Execute();
  835. with_lock(resultLock) {
  836. results.insert(result);
  837. }
  838. } catch (...) {
  839. }
  840. };
  841. auto t1 = SystemThreadFactory()->Run(func);
  842. auto t2 = SystemThreadFactory()->Run(func);
  843. Sleep(TDuration::MilliSeconds(ttl + 1));
  844. server.Lock.Release();
  845. t1->Join();
  846. t2->Join();
  847. UNIT_ASSERT_EQUAL_C(results, (THashSet<TString>({"Zoooo", "TTL Exceed"})), "Results is {" + ToString(results) + "}");
  848. }
  849. }