connection_pool_ut.cpp 5.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199
  1. #include "simple_server.h"
  2. #include <yt/cpp/mapreduce/http/http.h>
  3. #include <yt/cpp/mapreduce/interface/config.h>
  4. #include <library/cpp/threading/future/async.h>
  5. #include <library/cpp/http/io/stream.h>
  6. #include <library/cpp/testing/gtest/gtest.h>
  7. #include <library/cpp/testing/common/network.h>
  8. #include <util/string/builder.h>
  9. #include <util/stream/tee.h>
  10. #include <util/system/thread.h>
  11. using namespace NYT;
  12. namespace {
  13. void ParseFirstLine(const TString firstLine, TString& method, TString& host , ui64& port, TString& command)
  14. {
  15. size_t idx = firstLine.find_first_of(' ');
  16. method = firstLine.substr(0, idx);
  17. size_t idx2 = firstLine.find_first_of(':', idx + 1);
  18. host = firstLine.substr(idx + 1, idx2 - idx - 1);
  19. idx = firstLine.find_first_of('/', idx2 + 1);
  20. port = std::atoi(firstLine.substr(idx2 + 1, idx - idx2 - 1).c_str());
  21. idx2 = firstLine.find_first_of(' ', idx + 1);
  22. command = firstLine.substr(idx, idx2 - idx);
  23. }
  24. } // namespace
  25. THolder<TSimpleServer> CreateSimpleHttpServer()
  26. {
  27. auto port = NTesting::GetFreePort();
  28. return MakeHolder<TSimpleServer>(
  29. port,
  30. [] (IInputStream* input, IOutputStream* output) {
  31. try {
  32. while (true) {
  33. THttpInput httpInput(input);
  34. httpInput.ReadAll();
  35. THttpOutput httpOutput(output);
  36. httpOutput.EnableKeepAlive(true);
  37. httpOutput << "HTTP/1.1 200 OK\r\n";
  38. httpOutput << "\r\n";
  39. for (size_t i = 0; i != 10000; ++i) {
  40. httpOutput << "The grass was greener";
  41. }
  42. httpOutput.Flush();
  43. }
  44. } catch (const std::exception&) {
  45. }
  46. });
  47. }
  48. THolder<TSimpleServer> CreateProxyHttpServer()
  49. {
  50. auto port = NTesting::GetFreePort();
  51. return MakeHolder<TSimpleServer>(
  52. port,
  53. [] (IInputStream* input, IOutputStream* output) {
  54. try {
  55. while (true) {
  56. THttpInput httpInput(input);
  57. const TString inputStr = httpInput.FirstLine();
  58. auto headers = httpInput.Headers();
  59. TString method, command, host;
  60. ui64 port;
  61. ParseFirstLine(inputStr, method, host, port, command);
  62. THttpRequest request;
  63. const TString hostName = ::TStringBuilder() << host << ":" << port;
  64. request.Connect(hostName);
  65. auto header = THttpHeader(method, command);
  66. request.StartRequest(header);
  67. request.FinishRequest();
  68. auto res = request.GetResponseStream();
  69. THttpOutput httpOutput(output);
  70. httpOutput.EnableKeepAlive(true);
  71. auto strRes = res->ReadAll();
  72. httpOutput << "HTTP/1.1 200 OK\r\n";
  73. httpOutput << "\r\n";
  74. httpOutput << strRes;
  75. httpOutput.Flush();
  76. }
  77. } catch (const std::exception&) {
  78. }
  79. });
  80. }
  81. class TConnectionPoolConfigGuard
  82. {
  83. public:
  84. TConnectionPoolConfigGuard(int newSize)
  85. {
  86. OldValue_ = TConfig::Get()->ConnectionPoolSize;
  87. TConfig::Get()->ConnectionPoolSize = newSize;
  88. }
  89. ~TConnectionPoolConfigGuard()
  90. {
  91. TConfig::Get()->ConnectionPoolSize = OldValue_;
  92. }
  93. private:
  94. int OldValue_;
  95. };
  96. class TFuncThread
  97. : public ISimpleThread
  98. {
  99. public:
  100. using TFunc = std::function<void()>;
  101. public:
  102. TFuncThread(const TFunc& func)
  103. : Func_(func)
  104. { }
  105. void* ThreadProc() noexcept override {
  106. Func_();
  107. return nullptr;
  108. }
  109. private:
  110. TFunc Func_;
  111. };
  112. TEST(TConnectionPool, TestReleaseUnread)
  113. {
  114. auto simpleServer = CreateSimpleHttpServer();
  115. const TString hostName = ::TStringBuilder() << "localhost:" << simpleServer->GetPort();
  116. for (size_t i = 0; i != 10; ++i) {
  117. THttpRequest request;
  118. request.Connect(hostName);
  119. request.StartRequest(THttpHeader("GET", "foo"));
  120. request.FinishRequest();
  121. request.GetResponseStream();
  122. }
  123. }
  124. TEST(TConnectionPool, TestProxy)
  125. {
  126. auto simpleServer = CreateSimpleHttpServer();
  127. auto simpleServer2 = CreateProxyHttpServer();
  128. const TString hostName = ::TStringBuilder() << "localhost:" << simpleServer->GetPort();
  129. const TString hostName2 = ::TStringBuilder() << "localhost:" << simpleServer2->GetPort();
  130. for (size_t i = 0; i != 10; ++i) {
  131. THttpRequest request;
  132. request.Connect(hostName2);
  133. auto header = THttpHeader("GET", "foo");
  134. header.SetProxyAddress(hostName2);
  135. header.SetHostPort(hostName);
  136. request.StartRequest(header);
  137. request.FinishRequest();
  138. request.GetResponseStream();
  139. }
  140. }
  141. TEST(TConnectionPool, TestConcurrency)
  142. {
  143. TConnectionPoolConfigGuard g(1);
  144. auto simpleServer = CreateSimpleHttpServer();
  145. const TString hostName = ::TStringBuilder() << "localhost:" << simpleServer->GetPort();
  146. auto threadPool = CreateThreadPool(20);
  147. const auto func = [&] {
  148. for (int i = 0; i != 100; ++i) {
  149. THttpRequest request;
  150. request.Connect(hostName);
  151. request.StartRequest(THttpHeader("GET", "foo"));
  152. request.FinishRequest();
  153. auto res = request.GetResponseStream();
  154. res->ReadAll();
  155. }
  156. };
  157. TVector<THolder<TFuncThread>> threads;
  158. for (int i = 0; i != 10; ++i) {
  159. threads.push_back(MakeHolder<TFuncThread>(func));
  160. };
  161. for (auto& t : threads) {
  162. t->Start();
  163. }
  164. for (auto& t : threads) {
  165. t->Join();
  166. }
  167. }