http_ut.cpp 24 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739
  1. #include "http.h"
  2. #include "http_ex.h"
  3. #include <library/cpp/testing/unittest/registar.h>
  4. #include <library/cpp/testing/unittest/tests_data.h>
  5. #include <util/generic/cast.h>
  6. #include <util/stream/output.h>
  7. #include <util/stream/zlib.h>
  8. #include <util/system/datetime.h>
  9. #include <util/system/sem.h>
  10. Y_UNIT_TEST_SUITE(THttpServerTest) {
  11. class TEchoServer: public THttpServer::ICallBack {
  12. class TRequest: public THttpClientRequestEx {
  13. public:
  14. inline TRequest(TEchoServer* parent)
  15. : Parent_(parent)
  16. {
  17. }
  18. bool Reply(void* /*tsr*/) override {
  19. if (!ProcessHeaders()) {
  20. return true;
  21. }
  22. Output() << "HTTP/1.1 200 Ok\r\n\r\n";
  23. if (Buf.Size()) {
  24. Output().Write(Buf.AsCharPtr(), Buf.Size());
  25. } else {
  26. Output() << Parent_->Res_;
  27. }
  28. Output().Finish();
  29. return true;
  30. }
  31. private:
  32. TEchoServer* Parent_ = nullptr;
  33. };
  34. public:
  35. inline TEchoServer(const TString& res)
  36. : Res_(res)
  37. {
  38. }
  39. TClientRequest* CreateClient() override {
  40. return new TRequest(this);
  41. }
  42. private:
  43. TString Res_;
  44. };
  45. class TSleepingServer: public THttpServer::ICallBack {
  46. class TReplier: public TRequestReplier {
  47. public:
  48. inline TReplier(TSleepingServer* server)
  49. : Server(server)
  50. {
  51. }
  52. bool DoReply(const TReplyParams& params) override {
  53. Server->FreeThread();
  54. Server->Busy(1);
  55. params.Output.Write("HTTP/1.0 201 Created\nX-Server: sleeping server\n\nZoooo");
  56. params.Output.Finish();
  57. Server->Replies->Inc();
  58. return true;
  59. }
  60. private:
  61. TSleepingServer* Server = nullptr;
  62. };
  63. public:
  64. inline TSleepingServer(unsigned int size)
  65. : Semaphore("conns", size)
  66. , Semaphore2("threads", 1)
  67. , Replies(new TAtomicCounter())
  68. , MaxConns(new TAtomicCounter())
  69. {
  70. }
  71. void ResetCounters() {
  72. Replies.Reset(new TAtomicCounter());
  73. MaxConns.Reset(new TAtomicCounter());
  74. }
  75. long RepliesCount() const {
  76. return Replies->Val();
  77. }
  78. long MaxConnsCount() const {
  79. return MaxConns->Val();
  80. }
  81. TClientRequest* CreateClient() override {
  82. return new TReplier(this);
  83. }
  84. void OnMaxConn() override {
  85. MaxConns->Inc();
  86. }
  87. void OnFailRequest(int) override {
  88. FreeThread();
  89. Busy(1);
  90. }
  91. void Busy(int count) {
  92. while (count-- > 0) {
  93. Semaphore.Acquire();
  94. }
  95. }
  96. void BusyThread() {
  97. Semaphore2.Acquire();
  98. }
  99. void Free(int count) {
  100. while (count-- > 0) {
  101. Semaphore.Release();
  102. }
  103. }
  104. void FreeThread() {
  105. Semaphore2.Release();
  106. }
  107. private:
  108. TSemaphore Semaphore;
  109. TSemaphore Semaphore2;
  110. THolder<TAtomicCounter> Replies;
  111. THolder<TAtomicCounter> MaxConns;
  112. };
  113. static const TString CrLf = "\r\n";
  114. struct TTestRequest {
  115. TTestRequest(ui16 port, TString content = TString())
  116. : Port(port)
  117. , Content(std::move(content))
  118. {
  119. }
  120. void CheckContinue(TSocketInput& si) {
  121. if (Expect100Continue) {
  122. TStringStream ss;
  123. TString firstLine;
  124. si.ReadLine(firstLine);
  125. for (;;) {
  126. TString buf;
  127. si.ReadLine(buf);
  128. if (buf.size() == 0) {
  129. break;
  130. }
  131. ss << buf << CrLf;
  132. }
  133. UNIT_ASSERT_EQUAL(firstLine, "HTTP/1.1 100 Continue");
  134. }
  135. }
  136. TString Execute() {
  137. TSocket* s = nullptr;
  138. THolder<TSocket> singleReqSocket;
  139. if (KeepAliveConnection) {
  140. if (!KeepAlivedSocket) {
  141. KeepAlivedSocket = MakeHolder<TSocket>(TNetworkAddress("localhost", Port), TDuration::Seconds(10));
  142. }
  143. s = KeepAlivedSocket.Get();
  144. } else {
  145. TNetworkAddress addr("localhost", Port);
  146. singleReqSocket.Reset(new TSocket(addr, TDuration::Seconds(10)));
  147. s = singleReqSocket.Get();
  148. }
  149. bool isPost = Type == "POST";
  150. TSocketInput si(*s);
  151. if (UseHttpOutput) {
  152. TSocketOutput so(*s);
  153. THttpOutput output(&so);
  154. output.EnableKeepAlive(KeepAliveConnection);
  155. output.EnableCompression(EnableResponseEncoding);
  156. TStringStream r;
  157. r << Type << " / HTTP/1.1" << CrLf;
  158. r << "Host: localhost:" + ToString(Port) << CrLf;
  159. if (isPost) {
  160. if (ContentEncoding.size()) {
  161. r << "Content-Encoding: " << ContentEncoding << CrLf;
  162. } else {
  163. r << "Transfer-Encoding: chunked" << CrLf;
  164. }
  165. if (Expect100Continue) {
  166. r << "Expect: 100-continue" << CrLf;
  167. }
  168. }
  169. r << CrLf;
  170. if (isPost) {
  171. output.Write(r.Str());
  172. output.Flush();
  173. CheckContinue(si);
  174. output.Write(Content);
  175. output.Finish();
  176. } else {
  177. output.Write(r.Str());
  178. output.Finish();
  179. }
  180. } else {
  181. TStringStream r;
  182. r << Type << " / HTTP/1.1" << CrLf;
  183. r << "Host: localhost:" + ToString(Port) << CrLf;
  184. if (KeepAliveConnection) {
  185. r << "Connection: Keep-Alive" << CrLf;
  186. } else {
  187. r << "Connection: Close" << CrLf;
  188. }
  189. if (EnableResponseEncoding) {
  190. r << "Accept-Encoding: gzip, deflate, x-gzip, x-deflate, y-lzo, y-lzf, y-lzq, y-bzip2, y-lzma" << CrLf;
  191. }
  192. if (isPost && Expect100Continue) {
  193. r << "Expect: 100-continue" << CrLf;
  194. }
  195. if (isPost && ContentEncoding.size() && Content.size()) {
  196. r << "Content-Encoding: " << ContentEncoding << CrLf;
  197. TStringStream compressedContent;
  198. {
  199. TZLibCompress zlib(&compressedContent);
  200. zlib.Write(Content.data(), Content.size());
  201. zlib.Flush();
  202. zlib.Finish();
  203. }
  204. r << "Content-Length: " << compressedContent.Size() << CrLf;
  205. r << CrLf;
  206. s->Send(r.Data(), r.Size());
  207. CheckContinue(si);
  208. Hdr = r.Str();
  209. TString tosend = compressedContent.Str();
  210. s->Send(tosend.data(), tosend.size());
  211. } else {
  212. if (isPost) {
  213. r << "Content-Length: " << Content.size() << CrLf;
  214. r << CrLf;
  215. s->Send(r.Data(), r.Size());
  216. CheckContinue(si);
  217. Hdr = r.Str();
  218. s->Send(Content.data(), Content.size());
  219. } else {
  220. r << CrLf;
  221. Hdr = r.Str();
  222. s->Send(r.Data(), r.Size());
  223. }
  224. }
  225. }
  226. THttpInput input(&si);
  227. TStringStream ss;
  228. TransferData(&input, &ss);
  229. return ss.Str();
  230. }
  231. TString GetDescription() const {
  232. if (UseHttpOutput) {
  233. TStringStream ss;
  234. ss << (KeepAliveConnection ? "keep-alive " : "") << Type;
  235. if (ContentEncoding.size()) {
  236. ss << " with encoding=" << ContentEncoding;
  237. }
  238. return ss.Str();
  239. } else {
  240. return Hdr;
  241. }
  242. }
  243. ui16 Port = 0;
  244. bool UseHttpOutput = true;
  245. TString Type = "GET";
  246. TString ContentEncoding;
  247. TString Content;
  248. bool KeepAliveConnection = false;
  249. THolder<TSocket> KeepAlivedSocket;
  250. bool EnableResponseEncoding = false;
  251. TString Hdr;
  252. bool Expect100Continue = false;
  253. };
  254. class TFailingMtpQueue: public TSimpleThreadPool {
  255. private:
  256. bool FailOnAdd_ = false;
  257. public:
  258. void SetFailOnAdd(bool fail = true) {
  259. FailOnAdd_ = fail;
  260. }
  261. [[nodiscard]] bool Add(IObjectInQueue* pObj) override {
  262. if (FailOnAdd_) {
  263. return false;
  264. }
  265. return TSimpleThreadPool::Add(pObj);
  266. }
  267. TFailingMtpQueue() = default;
  268. TFailingMtpQueue(IThreadFactory* pool)
  269. : TSimpleThreadPool(pool)
  270. {
  271. }
  272. };
  273. TString TestData(size_t size = 5 * 4096) {
  274. TString res;
  275. for (size_t i = 0; i < size; ++i) {
  276. res += (char)i;
  277. }
  278. return res;
  279. }
  280. Y_UNIT_TEST(TestEchoServer) {
  281. TString res = TestData();
  282. TPortManager pm;
  283. const ui16 port = pm.GetPort();
  284. const bool trueFalse[] = {true, false};
  285. TEchoServer serverImpl(res);
  286. THttpServer server(&serverImpl, THttpServer::TOptions(port).EnableKeepAlive(true).EnableCompression(true));
  287. for (int i = 0; i < 2; ++i) {
  288. UNIT_ASSERT(server.Start());
  289. TTestRequest r(port);
  290. r.Content = res;
  291. for (bool keepAlive : trueFalse) {
  292. r.KeepAliveConnection = keepAlive;
  293. // THttpOutput use chunked stream, else use Content-Length
  294. for (bool useHttpOutput : trueFalse) {
  295. r.UseHttpOutput = useHttpOutput;
  296. for (bool enableResponseEncoding : trueFalse) {
  297. r.EnableResponseEncoding = enableResponseEncoding;
  298. const TString reqTypes[] = {"GET", "POST"};
  299. for (const TString& reqType : reqTypes) {
  300. r.Type = reqType;
  301. const TString encoders[] = {"", "deflate"};
  302. for (const TString& encoder : encoders) {
  303. r.ContentEncoding = encoder;
  304. for (bool expect100Continue : trueFalse) {
  305. r.Expect100Continue = expect100Continue;
  306. TString resp = r.Execute();
  307. UNIT_ASSERT_C(resp == res, "diff echo response for request:\n" + r.GetDescription());
  308. }
  309. }
  310. }
  311. }
  312. }
  313. }
  314. server.Stop();
  315. }
  316. }
  317. Y_UNIT_TEST(TestReusePortEnabled) {
  318. if (!IsReusePortAvailable()) {
  319. return; // skip test
  320. }
  321. TString res = TestData();
  322. TPortManager pm;
  323. const ui16 port = pm.GetPort();
  324. TEchoServer serverImpl(res);
  325. TVector<THolder<THttpServer>> servers;
  326. for (ui32 i = 0; i < 10; i++) {
  327. servers.push_back(MakeHolder<THttpServer>(&serverImpl, THttpServer::TOptions(port).EnableReusePort(true)));
  328. }
  329. for (ui32 testRun = 0; testRun < 3; testRun++) {
  330. for (auto& server : servers) {
  331. // start servers one at a time and check at least one of them is replying
  332. UNIT_ASSERT(server->Start());
  333. TTestRequest r(port, res);
  334. UNIT_ASSERT_C(r.Execute() == res, "diff echo response for request:\n" + r.GetDescription());
  335. }
  336. for (auto& server : servers) {
  337. // ping servers and stop them one at a time
  338. // at the last iteration only one server is still working and then gets stopped as well
  339. TTestRequest r(port, res);
  340. UNIT_ASSERT_C(r.Execute() == res, "diff echo response for request:\n" + r.GetDescription());
  341. server->Stop();
  342. }
  343. }
  344. }
  345. Y_UNIT_TEST(TestReusePortDisabled) {
  346. // check that with the ReusePort option disabled it's impossible to start two servers on the same port
  347. // check that ReusePort option is disabled by default (don't set it explicitly in the test)
  348. TPortManager pm;
  349. const ui16 port = pm.GetPort();
  350. TEchoServer serverImpl(TString{});
  351. THttpServer server1(&serverImpl, THttpServer::TOptions(port));
  352. THttpServer server2(&serverImpl, THttpServer::TOptions(port));
  353. UNIT_ASSERT(true == server1.Start());
  354. UNIT_ASSERT(false == server2.Start());
  355. server1.Stop();
  356. // Stop() is a sync call, port should be free by now
  357. UNIT_ASSERT(true == server2.Start());
  358. UNIT_ASSERT(false == server1.Start());
  359. }
  360. Y_UNIT_TEST(TestFailServer) {
  361. /**
  362. * Emulate request processing failures
  363. * Data should be large enough not to fit into socket buffer
  364. **/
  365. TString res = TestData(10 * 1024 * 1024);
  366. TPortManager portManager;
  367. const ui16 port = portManager.GetPort();
  368. TEchoServer serverImpl(res);
  369. THttpServer::TOptions options(port);
  370. options.EnableKeepAlive(true);
  371. options.EnableCompression(true);
  372. using TFailingServerMtpQueue = TThreadPoolBinder<TFailingMtpQueue, THttpServer::ICallBack>;
  373. THttpServer::TMtpQueueRef mainWorkers = new TFailingServerMtpQueue(&serverImpl, SystemThreadFactory());
  374. THttpServer::TMtpQueueRef failWorkers = new TThreadPool(SystemThreadFactory());
  375. THttpServer server(&serverImpl, mainWorkers, failWorkers, options);
  376. UNIT_ASSERT(server.Start());
  377. for (size_t i = 0; i < 3; ++i) {
  378. // should fail on 2nd request
  379. static_cast<TFailingMtpQueue*>(mainWorkers.Get())->SetFailOnAdd(i == 1);
  380. TTestRequest r(port);
  381. r.Content = res;
  382. r.Type = "POST";
  383. TString resp = r.Execute();
  384. if (i == 1) {
  385. UNIT_ASSERT(resp.Contains("Service Unavailable"));
  386. } else {
  387. UNIT_ASSERT_C(resp == res, "diff echo response for request:\n" + r.GetDescription());
  388. }
  389. }
  390. server.Stop();
  391. }
  392. class TReleaseConnectionServer: public THttpServer::ICallBack {
  393. class TRequest: public THttpClientRequestEx {
  394. public:
  395. bool Reply(void* /*tsr*/) override {
  396. Output() << "HTTP/1.1 200 Ok\r\n\r\n";
  397. Output() << "reply";
  398. Output().Finish();
  399. ReleaseConnection();
  400. throw yexception() << "some error";
  401. return true;
  402. }
  403. };
  404. public:
  405. TClientRequest* CreateClient() override {
  406. return new TRequest();
  407. }
  408. void OnException() override {
  409. ExceptionMessage = CurrentExceptionMessage();
  410. }
  411. TString ExceptionMessage;
  412. };
  413. class TResetConnectionServer: public THttpServer::ICallBack {
  414. class TRequest: public TClientRequest {
  415. public:
  416. bool Reply(void* /*tsr*/) override {
  417. Output() << "HTTP/1.1";
  418. ResetConnection();
  419. return true;
  420. }
  421. };
  422. public:
  423. TClientRequest* CreateClient() override {
  424. return new TRequest();
  425. }
  426. void OnException() override {
  427. ExceptionMessage = CurrentExceptionMessage();
  428. }
  429. TString ExceptionMessage;
  430. };
  431. class TListenerSockAddrReplyServer: public THttpServer::ICallBack {
  432. class TRequest: public TClientRequest {
  433. public:
  434. bool Reply(void* /*tsr*/) override {
  435. Output() << "HTTP/1.1 200 Ok\r\n\r\n";
  436. Output() << PrintHostAndPort(*GetListenerSockAddrRef());
  437. Output().Finish();
  438. return true;
  439. }
  440. };
  441. public:
  442. TClientRequest* CreateClient() override {
  443. return new TRequest();
  444. }
  445. };
  446. Y_UNIT_TEST(TTestResetConnection) {
  447. TPortManager pm;
  448. const ui16 port = pm.GetPort();
  449. TResetConnectionServer serverImpl;
  450. THttpServer server(&serverImpl, THttpServer::TOptions(port));
  451. UNIT_ASSERT(server.Start());
  452. TTestRequest r(port, "request");
  453. UNIT_ASSERT_EXCEPTION_CONTAINS(r.Execute(), TSystemError, "Connection reset by peer");
  454. server.Stop();
  455. };
  456. Y_UNIT_TEST(TTestReleaseConnection) {
  457. TPortManager pm;
  458. const ui16 port = pm.GetPort();
  459. TReleaseConnectionServer serverImpl;
  460. THttpServer server(&serverImpl, THttpServer::TOptions(port).EnableKeepAlive(true));
  461. UNIT_ASSERT(server.Start());
  462. TTestRequest r(port, "request");
  463. r.KeepAliveConnection = true;
  464. UNIT_ASSERT_C(r.Execute() == "reply", "diff echo response for request:\n" + r.GetDescription());
  465. server.Stop();
  466. UNIT_ASSERT_STRINGS_EQUAL(serverImpl.ExceptionMessage, "(yexception) some error");
  467. };
  468. THttpInput SendRequest(TSocket& socket, ui16 port) {
  469. TSocketInput si(socket);
  470. TSocketOutput so(socket);
  471. THttpOutput out(&so);
  472. out.EnableKeepAlive(true);
  473. out << "GET / HTTP/1.1" << CrLf;
  474. out << "Host: localhost:" + ToString(port) << CrLf;
  475. out << CrLf;
  476. out.Flush();
  477. THttpInput input(&si);
  478. input.ReadAll();
  479. return input;
  480. }
  481. THttpInput SendRequestWithBody(TSocket& socket, ui16 port, TString body) {
  482. TSocketInput si(socket);
  483. TSocketOutput so(socket);
  484. THttpOutput out(&so);
  485. out << "POST / HTTP/1.1" << CrLf;
  486. out << "Host: localhost:" + ToString(port) << CrLf;
  487. out << "Content-Length: " + ToString(body.size()) << CrLf;
  488. out << CrLf;
  489. out << body;
  490. out.Flush();
  491. THttpInput input(&si);
  492. input.ReadAll();
  493. return input;
  494. }
  495. Y_UNIT_TEST(TTestExpirationTimeout) {
  496. TPortManager pm;
  497. const ui16 port = pm.GetPort();
  498. TEchoServer serverImpl("test_data");
  499. THttpServer::TOptions options(port);
  500. options.nThreads = 1;
  501. options.MaxQueueSize = 0;
  502. options.MaxConnections = 0;
  503. options.KeepAliveEnabled = true;
  504. options.ExpirationTimeout = TDuration::Seconds(1);
  505. options.PollTimeout = TDuration::MilliSeconds(100);
  506. THttpServer server(&serverImpl, options);
  507. UNIT_ASSERT(server.Start());
  508. TSocket socket(TNetworkAddress("localhost", port), TDuration::Seconds(10));
  509. SendRequest(socket, port);
  510. SendRequest(socket, port);
  511. Sleep(TDuration::Seconds(5));
  512. UNIT_ASSERT_EXCEPTION(SendRequest(socket, port), THttpReadException);
  513. server.Stop();
  514. }
  515. Y_UNIT_TEST(TTestContentLengthTooLarge) {
  516. TPortManager pm;
  517. const ui16 port = pm.GetPort();
  518. TEchoServer serverImpl("test_data");
  519. THttpServer::TOptions options(port);
  520. options.nThreads = 1;
  521. options.MaxQueueSize = 0;
  522. options.MaxInputContentLength = 2_KB;
  523. options.MaxConnections = 0;
  524. options.KeepAliveEnabled = false;
  525. options.ExpirationTimeout = TDuration::Seconds(1);
  526. options.PollTimeout = TDuration::MilliSeconds(100);
  527. THttpServer server(&serverImpl, options);
  528. UNIT_ASSERT(server.Start());
  529. TSocket socket(TNetworkAddress("localhost", port), TDuration::Seconds(5));
  530. UNIT_ASSERT_STRING_CONTAINS(SendRequestWithBody(socket, port, TString(1_KB, 'a')).FirstLine(), "HTTP/1.1 200 Ok");
  531. TSocket socket2(TNetworkAddress("localhost", port), TDuration::Seconds(5));
  532. UNIT_ASSERT_STRING_CONTAINS(SendRequestWithBody(socket2, port, TString(10_KB, 'a')).FirstLine(), "HTTP/1.1 413 Payload Too Large");
  533. server.Stop();
  534. }
  535. Y_UNIT_TEST(TTestCloseConnectionOnRequestLimit) {
  536. TPortManager pm;
  537. const ui16 port = pm.GetPort();
  538. TEchoServer serverImpl("test_data");
  539. THttpServer server(&serverImpl, THttpServer::TOptions(port).EnableKeepAlive(true).SetMaxRequestsPerConnection(2));
  540. UNIT_ASSERT(server.Start());
  541. TSocket socket(TNetworkAddress("localhost", port), TDuration::Seconds(10));
  542. UNIT_ASSERT(SendRequest(socket, port).IsKeepAlive());
  543. UNIT_ASSERT(!SendRequest(socket, port).IsKeepAlive());
  544. UNIT_ASSERT_EXCEPTION(SendRequest(socket, port), THttpReadException);
  545. server.Stop();
  546. }
  547. Y_UNIT_TEST(TTestListenerSockAddrConnection) {
  548. TPortManager pm;
  549. const ui16 port1 = pm.GetPort();
  550. const ui16 port2 = pm.GetPort();
  551. TListenerSockAddrReplyServer serverImpl;
  552. THttpServer server(&serverImpl, THttpServer::TOptions().EnableKeepAlive(true).AddBindAddress("127.0.0.1", port1).AddBindAddress("127.0.0.1", port2));
  553. UNIT_ASSERT(server.Start());
  554. TTestRequest r1(port1);
  555. r1.KeepAliveConnection = true;
  556. TString resp = r1.Execute();
  557. UNIT_ASSERT(resp == TString::Join("127.0.0.1", ":", ToString(port1)));
  558. TTestRequest r2(port2);
  559. r2.KeepAliveConnection = true;
  560. resp = r2.Execute();
  561. UNIT_ASSERT(resp == TString::Join("127.0.0.1", ":", ToString(port2)));
  562. server.Stop();
  563. };
  564. #if 0
  565. Y_UNIT_TEST(TestSocketsLeak) {
  566. const bool trueFalse[] = {true, false};
  567. TPortManager portManager;
  568. const ui16 port = portManager.GetPort();
  569. TString res = TestData(25);
  570. TSleepingServer server(3);
  571. THttpServer::TOptions options(port);
  572. options.MaxConnections = 1;
  573. options.MaxQueueSize = 1;
  574. options.MaxFQueueSize = 2;
  575. options.nFThreads = 2;
  576. options.KeepAliveEnabled = true;
  577. options.RejectExcessConnections = true;
  578. THttpServer srv(&server, options);
  579. UNIT_ASSERT(srv.Start());
  580. for (bool keepAlive : trueFalse) {
  581. server.ResetCounters();
  582. TVector<TAutoPtr<IThreadFactory::IThread>> threads;
  583. server.Busy(3);
  584. server.BusyThread();
  585. for (size_t i = 0; i < 3; ++i) {
  586. auto func = [&server, port, keepAlive]() {
  587. server.BusyThread();
  588. THolder<TTestRequest> r = MakeHolder<TTestRequest>(port);
  589. r->KeepAliveConnection = keepAlive;
  590. r->Execute();
  591. };
  592. threads.push_back(SystemThreadFactory()->Run(func));
  593. }
  594. server.FreeThread(); // all threads get connection & go to processing
  595. Sleep(TDuration::MilliSeconds(100));
  596. server.BusyThread(); // we wait while connections are established by the
  597. // system and accepted by the server
  598. server.Free(3); // we release all connections processing
  599. for (auto&& thread : threads) {
  600. thread->Join();
  601. }
  602. server.Free(3);
  603. server.FreeThread();
  604. UNIT_ASSERT_EQUAL_C(server.MaxConnsCount(), 2, "we should get MaxConn notification 2 times, got " + ToString(server.MaxConnsCount()));
  605. UNIT_ASSERT_EQUAL_C(server.RepliesCount(), 1, "only one request should have been processed, got " + ToString(server.RepliesCount()));
  606. }
  607. }
  608. #endif
  609. }