host_manager.cpp 3.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140
  1. #include "host_manager.h"
  2. #include "context.h"
  3. #include "helpers.h"
  4. #include "http.h"
  5. #include "http_client.h"
  6. #include "requests.h"
  7. #include <yt/cpp/mapreduce/interface/logging/yt_log.h>
  8. #include <yt/cpp/mapreduce/interface/config.h>
  9. #include <library/cpp/json/json_reader.h>
  10. #include <util/generic/guid.h>
  11. #include <util/generic/vector.h>
  12. #include <util/generic/singleton.h>
  13. #include <util/generic/ymath.h>
  14. #include <util/random/random.h>
  15. #include <util/string/vector.h>
  16. namespace NYT::NPrivate {
  17. ////////////////////////////////////////////////////////////////////////////////
  18. static TVector<TString> ParseJsonStringArray(const TString& response)
  19. {
  20. NJson::TJsonValue value;
  21. TStringInput input(response);
  22. NJson::ReadJsonTree(&input, &value);
  23. const NJson::TJsonValue::TArray& array = value.GetArray();
  24. TVector<TString> result;
  25. result.reserve(array.size());
  26. for (size_t i = 0; i < array.size(); ++i) {
  27. result.push_back(array[i].GetString());
  28. }
  29. return result;
  30. }
  31. ////////////////////////////////////////////////////////////////////////////////
  32. class THostManager::TClusterHostList
  33. {
  34. public:
  35. explicit TClusterHostList(TVector<TString> hosts)
  36. : Hosts_(std::move(hosts))
  37. , Timestamp_(TInstant::Now())
  38. { }
  39. explicit TClusterHostList(std::exception_ptr error)
  40. : Error_(std::move(error))
  41. , Timestamp_(TInstant::Now())
  42. { }
  43. TString ChooseHostOrThrow() const
  44. {
  45. if (Error_) {
  46. std::rethrow_exception(Error_);
  47. }
  48. if (Hosts_.empty()) {
  49. ythrow yexception() << "fetched list of proxies is empty";
  50. }
  51. return Hosts_[RandomNumber<size_t>(Hosts_.size())];
  52. }
  53. TDuration GetAge() const
  54. {
  55. return TInstant::Now() - Timestamp_;
  56. }
  57. private:
  58. TVector<TString> Hosts_;
  59. std::exception_ptr Error_;
  60. TInstant Timestamp_;
  61. };
  62. ////////////////////////////////////////////////////////////////////////////////
  63. THostManager& THostManager::Get()
  64. {
  65. return *Singleton<THostManager>();
  66. }
  67. void THostManager::Reset()
  68. {
  69. auto guard = Guard(Lock_);
  70. ClusterHosts_.clear();
  71. }
  72. TString THostManager::GetProxyForHeavyRequest(const TClientContext& context)
  73. {
  74. auto cluster = context.ProxyAddress ? *context.ProxyAddress : context.ServerName;
  75. {
  76. auto guard = Guard(Lock_);
  77. auto it = ClusterHosts_.find(cluster);
  78. if (it != ClusterHosts_.end() && it->second.GetAge() < context.Config->HostListUpdateInterval) {
  79. return it->second.ChooseHostOrThrow();
  80. }
  81. }
  82. auto hostList = GetHosts(context);
  83. auto result = hostList.ChooseHostOrThrow();
  84. {
  85. auto guard = Guard(Lock_);
  86. ClusterHosts_.emplace(cluster, std::move(hostList));
  87. }
  88. return result;
  89. }
  90. THostManager::TClusterHostList THostManager::GetHosts(const TClientContext& context)
  91. {
  92. TString hostsEndpoint = context.Config->Hosts;
  93. while (hostsEndpoint.StartsWith("/")) {
  94. hostsEndpoint = hostsEndpoint.substr(1);
  95. }
  96. THttpHeader header("GET", hostsEndpoint, false);
  97. try {
  98. auto requestId = CreateGuidAsString();
  99. // TODO: we need to set socket timeout here
  100. UpdateHeaderForProxyIfNeed(context.ServerName, context, header);
  101. auto response = context.HttpClient->Request(GetFullUrlForProxy(context.ServerName, context, header), requestId, header);
  102. auto hosts = ParseJsonStringArray(response->GetResponse());
  103. for (auto& host : hosts) {
  104. host = CreateHostNameWithPort(host, context);
  105. }
  106. return TClusterHostList(std::move(hosts));
  107. } catch (const std::exception& e) {
  108. return TClusterHostList(std::current_exception());
  109. }
  110. }
  111. ////////////////////////////////////////////////////////////////////////////////
  112. } // namespace NYT::NPrivate