connection_pool_ut.cpp 5.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196
  1. #include "simple_server.h"
  2. #include <yt/cpp/mapreduce/http/http.h>
  3. #include <yt/cpp/mapreduce/interface/config.h>
  4. #include <library/cpp/threading/future/async.h>
  5. #include <library/cpp/http/io/stream.h>
  6. #include <library/cpp/testing/gtest/gtest.h>
  7. #include <library/cpp/testing/common/network.h>
  8. #include <util/string/builder.h>
  9. #include <util/stream/tee.h>
  10. #include <util/system/thread.h>
  11. using namespace NYT;
  12. namespace {
  13. void ParseFirstLine(const TString firstLine, TString& method, TString& host , ui64& port, TString& command)
  14. {
  15. size_t idx = firstLine.find_first_of(' ');
  16. method = firstLine.substr(0, idx);
  17. size_t idx2 = firstLine.find_first_of(':', idx + 1);
  18. host = firstLine.substr(idx + 1, idx2 - idx - 1);
  19. idx = firstLine.find_first_of('/', idx2 + 1);
  20. port = std::atoi(firstLine.substr(idx2 + 1, idx - idx2 - 1).c_str());
  21. idx2 = firstLine.find_first_of(' ', idx + 1);
  22. command = firstLine.substr(idx, idx2 - idx);
  23. }
  24. } // namespace
  25. THolder<TSimpleServer> CreateSimpleHttpServer()
  26. {
  27. auto port = NTesting::GetFreePort();
  28. return MakeHolder<TSimpleServer>(
  29. port,
  30. [] (IInputStream* input, IOutputStream* output) {
  31. try {
  32. while (true) {
  33. THttpInput httpInput(input);
  34. httpInput.ReadAll();
  35. THttpOutput httpOutput(output);
  36. httpOutput.EnableKeepAlive(true);
  37. httpOutput << "HTTP/1.1 200 OK\r\n";
  38. httpOutput << "\r\n";
  39. for (size_t i = 0; i != 10000; ++i) {
  40. httpOutput << "The grass was greener";
  41. }
  42. httpOutput.Flush();
  43. }
  44. } catch (const std::exception&) {
  45. }
  46. });
  47. }
  48. THolder<TSimpleServer> CreateProxyHttpServer()
  49. {
  50. auto port = NTesting::GetFreePort();
  51. return MakeHolder<TSimpleServer>(
  52. port,
  53. [] (IInputStream* input, IOutputStream* output) {
  54. try {
  55. while (true) {
  56. THttpInput httpInput(input);
  57. const TString inputStr = httpInput.FirstLine();
  58. auto headers = httpInput.Headers();
  59. TString method, command, host;
  60. ui64 port;
  61. ParseFirstLine(inputStr, method, host, port, command);
  62. const TString hostName = ::TStringBuilder() << host << ":" << port;
  63. auto header = THttpHeader(method, command);
  64. THttpRequest request("0-0-0-0", hostName, header, TDuration::Zero());
  65. request.StartRequest();
  66. request.FinishRequest();
  67. auto res = request.GetResponseStream();
  68. THttpOutput httpOutput(output);
  69. httpOutput.EnableKeepAlive(true);
  70. auto strRes = res->ReadAll();
  71. httpOutput << "HTTP/1.1 200 OK\r\n";
  72. httpOutput << "\r\n";
  73. httpOutput << strRes;
  74. httpOutput.Flush();
  75. }
  76. } catch (const std::exception&) {
  77. }
  78. });
  79. }
  80. class TConnectionPoolConfigGuard
  81. {
  82. public:
  83. TConnectionPoolConfigGuard(int newSize)
  84. {
  85. OldValue_ = TConfig::Get()->ConnectionPoolSize;
  86. TConfig::Get()->ConnectionPoolSize = newSize;
  87. }
  88. ~TConnectionPoolConfigGuard()
  89. {
  90. TConfig::Get()->ConnectionPoolSize = OldValue_;
  91. }
  92. private:
  93. int OldValue_;
  94. };
  95. class TFuncThread
  96. : public ISimpleThread
  97. {
  98. public:
  99. using TFunc = std::function<void()>;
  100. public:
  101. TFuncThread(const TFunc& func)
  102. : Func_(func)
  103. { }
  104. void* ThreadProc() noexcept override {
  105. Func_();
  106. return nullptr;
  107. }
  108. private:
  109. TFunc Func_;
  110. };
  111. TEST(TConnectionPool, TestReleaseUnread)
  112. {
  113. auto simpleServer = CreateSimpleHttpServer();
  114. const TString hostName = ::TStringBuilder() << "localhost:" << simpleServer->GetPort();
  115. for (size_t i = 0; i != 10; ++i) {
  116. THttpRequest request("0-0-0-0", hostName, THttpHeader("GET", "foo"), TDuration::Zero());
  117. request.StartRequest();
  118. request.FinishRequest();
  119. request.GetResponseStream();
  120. }
  121. }
  122. TEST(TConnectionPool, TestProxy)
  123. {
  124. auto simpleServer = CreateSimpleHttpServer();
  125. auto simpleServer2 = CreateProxyHttpServer();
  126. const TString hostName = ::TStringBuilder() << "localhost:" << simpleServer->GetPort();
  127. const TString hostName2 = ::TStringBuilder() << "localhost:" << simpleServer2->GetPort();
  128. for (size_t i = 0; i != 10; ++i) {
  129. auto header = THttpHeader("GET", "foo");
  130. header.SetProxyAddress(hostName2);
  131. header.SetHostPort(hostName);
  132. THttpRequest request("0-0-0-0", hostName2, header, TDuration::Zero());
  133. request.StartRequest();
  134. request.FinishRequest();
  135. request.GetResponseStream();
  136. }
  137. }
  138. TEST(TConnectionPool, TestConcurrency)
  139. {
  140. TConnectionPoolConfigGuard g(1);
  141. auto simpleServer = CreateSimpleHttpServer();
  142. const TString hostName = ::TStringBuilder() << "localhost:" << simpleServer->GetPort();
  143. auto threadPool = CreateThreadPool(20);
  144. const auto func = [&] {
  145. for (int i = 0; i != 100; ++i) {
  146. THttpRequest request("0-0-0-0", hostName, THttpHeader("GET", "foo"), TDuration::Zero());
  147. request.StartRequest();
  148. request.FinishRequest();
  149. auto res = request.GetResponseStream();
  150. res->ReadAll();
  151. }
  152. };
  153. TVector<THolder<TFuncThread>> threads;
  154. for (int i = 0; i != 10; ++i) {
  155. threads.push_back(MakeHolder<TFuncThread>(func));
  156. };
  157. for (auto& t : threads) {
  158. t->Start();
  159. }
  160. for (auto& t : threads) {
  161. t->Join();
  162. }
  163. }