Browse Source

add using http-proxy for reading table from YT

add using http-proxy for reading table from YT
Нам нужна возможность ходить  в YT через HTTP proxy для чтения таблиц, используя С++ клиент не из контура Яндекса, к сожалению, сейчас такой возможности нет. В этом ПР черновик изменения, которого нам достаточно
https://a.yandex-team.ru/review/4676436/details - тут это же изменение в YT + коммит с тем, как мы планируем использовать
annashest18 1 year ago
parent
commit
24a44df2ab

+ 1 - 0
yt/cpp/mapreduce/client/client.cpp

@@ -1296,6 +1296,7 @@ TClientPtr CreateClientImpl(
     context.Config = options.Config_ ? options.Config_ : TConfig::Get();
     context.Config = options.Config_ ? options.Config_ : TConfig::Get();
     context.TvmOnly = options.TvmOnly_;
     context.TvmOnly = options.TvmOnly_;
     context.UseTLS = options.UseTLS_;
     context.UseTLS = options.UseTLS_;
+    context.ProxyAddress = options.ProxyAddress_;
 
 
     context.ServerName = serverName;
     context.ServerName = serverName;
     if (serverName.find('.') == TString::npos &&
     if (serverName.find('.') == TString::npos &&

+ 2 - 1
yt/cpp/mapreduce/client/client_reader.cpp

@@ -191,7 +191,8 @@ void TClientReader::CreateRequest(const TMaybe<ui32>& rangeIndex, const TMaybe<u
 
 
         try {
         try {
             const auto proxyName = GetProxyForHeavyRequest(Context_);
             const auto proxyName = GetProxyForHeavyRequest(Context_);
-            Response_ = Context_.HttpClient->Request(GetFullUrl(proxyName, Context_, header), requestId, header);
+            UpdateHeaderForProxyIfNeed(proxyName, Context_, header);
+            Response_ = Context_.HttpClient->Request(GetFullUrlForProxy(proxyName, Context_, header), requestId, header);
 
 
             Input_ = Response_->GetResponseStream();
             Input_ = Response_->GetResponseStream();
 
 

+ 4 - 0
yt/cpp/mapreduce/client/file_reader.cpp

@@ -149,6 +149,8 @@ NHttpClient::IHttpResponsePtr TFileReader::Request(const TClientContext& context
         header.SetImpersonationUser(*context.ImpersonationUser);
         header.SetImpersonationUser(*context.ImpersonationUser);
     }
     }
 
 
+    UpdateHeaderForProxyIfNeed(hostName, context, header);
+
     header.AddTransactionId(transactionId);
     header.AddTransactionId(transactionId);
     header.SetOutputFormat(TMaybe<TFormat>()); // Binary format
     header.SetOutputFormat(TMaybe<TFormat>()); // Binary format
 
 
@@ -208,6 +210,8 @@ NHttpClient::IHttpResponsePtr TBlobTableReader::Request(const TClientContext& co
         header.SetImpersonationUser(*context.ImpersonationUser);
         header.SetImpersonationUser(*context.ImpersonationUser);
     }
     }
 
 
+    UpdateHeaderForProxyIfNeed(hostName, context, header);
+
     header.AddTransactionId(transactionId);
     header.AddTransactionId(transactionId);
     header.SetOutputFormat(TMaybe<TFormat>()); // Binary format
     header.SetOutputFormat(TMaybe<TFormat>()); // Binary format
 
 

+ 2 - 0
yt/cpp/mapreduce/client/retry_heavy_write_request.cpp

@@ -51,6 +51,8 @@ void RetryHeavyWriteRequest(
             auto hostName = GetProxyForHeavyRequest(context);
             auto hostName = GetProxyForHeavyRequest(context);
             requestId = CreateGuidAsString();
             requestId = CreateGuidAsString();
 
 
+            UpdateHeaderForProxyIfNeed(hostName, context, header);
+
             header.AddTransactionId(attemptTx.GetId(), /* overwrite = */ true);
             header.AddTransactionId(attemptTx.GetId(), /* overwrite = */ true);
             header.SetRequestCompression(ToString(context.Config->ContentEncoding));
             header.SetRequestCompression(ToString(context.Config->ContentEncoding));
 
 

+ 1 - 0
yt/cpp/mapreduce/client/retryless_writer.h

@@ -55,6 +55,7 @@ public:
         TString requestId = CreateGuidAsString();
         TString requestId = CreateGuidAsString();
 
 
         auto hostName = GetProxyForHeavyRequest(context);
         auto hostName = GetProxyForHeavyRequest(context);
+        UpdateHeaderForProxyIfNeed(hostName, context, header);
         Request_ = context.HttpClient->StartRequest(GetFullUrl(hostName, context, header), requestId, header);
         Request_ = context.HttpClient->StartRequest(GetFullUrl(hostName, context, header), requestId, header);
         BufferedOutput_.Reset(new TBufferedOutput(Request_->GetStream(), BufferSize_));
         BufferedOutput_.Reset(new TBufferedOutput(Request_->GetStream(), BufferSize_));
     }
     }

+ 2 - 1
yt/cpp/mapreduce/http/context.cpp

@@ -12,7 +12,8 @@ bool operator==(const TClientContext& lhs, const TClientContext& rhs)
            lhs.ServiceTicketAuth == rhs.ServiceTicketAuth &&
            lhs.ServiceTicketAuth == rhs.ServiceTicketAuth &&
            lhs.HttpClient == rhs.HttpClient &&
            lhs.HttpClient == rhs.HttpClient &&
            lhs.UseTLS == rhs.UseTLS &&
            lhs.UseTLS == rhs.UseTLS &&
-           lhs.TvmOnly == rhs.TvmOnly;
+           lhs.TvmOnly == rhs.TvmOnly &&
+           lhs.ProxyAddress == rhs.ProxyAddress;
 }
 }
 
 
 bool operator!=(const TClientContext& lhs, const TClientContext& rhs)
 bool operator!=(const TClientContext& lhs, const TClientContext& rhs)

+ 1 - 0
yt/cpp/mapreduce/http/context.h

@@ -21,6 +21,7 @@ struct TClientContext
     bool TvmOnly = false;
     bool TvmOnly = false;
     bool UseTLS = false;
     bool UseTLS = false;
     TConfigPtr Config = TConfig::Get();
     TConfigPtr Config = TConfig::Get();
+    TMaybe<TString> ProxyAddress;
 };
 };
 
 
 bool operator==(const TClientContext& lhs, const TClientContext& rhs);
 bool operator==(const TClientContext& lhs, const TClientContext& rhs);

+ 17 - 0
yt/cpp/mapreduce/http/helpers.cpp

@@ -41,6 +41,23 @@ TString GetFullUrl(const TString& hostName, const TClientContext& context, THttp
     return Format("http://%v%v", hostName, header.GetUrl());
     return Format("http://%v%v", hostName, header.GetUrl());
 }
 }
 
 
+void UpdateHeaderForProxyIfNeed(const TString& hostName, const TClientContext& context, THttpHeader& header)
+{
+    if (context.ProxyAddress) {
+        header.SetHostPort(Format("http://%v", hostName));
+        header.SetProxyAddress(*context.ProxyAddress);
+    }
+}
+
+TString GetFullUrlForProxy(const TString& hostName, const TClientContext& context, THttpHeader& header)
+{
+    if (context.ProxyAddress) {
+        THttpHeader emptyHeader(header.GetMethod(), "", false);
+        return GetFullUrl(*context.ProxyAddress, context, emptyHeader);
+    }
+    return GetFullUrl(hostName, context, header);
+}
+
 static TString GetParametersDebugString(const THttpHeader& header)
 static TString GetParametersDebugString(const THttpHeader& header)
 {
 {
     const auto& parameters = header.GetParameters();
     const auto& parameters = header.GetParameters();

+ 4 - 0
yt/cpp/mapreduce/http/helpers.h

@@ -14,6 +14,10 @@ TString CreateHostNameWithPort(const TString& name, const TClientContext& contex
 
 
 TString GetFullUrl(const TString& hostName, const TClientContext& context, THttpHeader& header);
 TString GetFullUrl(const TString& hostName, const TClientContext& context, THttpHeader& header);
 
 
+void UpdateHeaderForProxyIfNeed(const TString& hostName, const TClientContext& context, THttpHeader& header);
+
+TString GetFullUrlForProxy(const TString& hostName, const TClientContext& context, THttpHeader& header);
+
 TString TruncateForLogs(const TString& text, size_t maxSize);
 TString TruncateForLogs(const TString& text, size_t maxSize);
 
 
 TString GetLoggedAttributes(const THttpHeader& header, const TString& url, bool includeParameters, size_t sizeLimit);
 TString GetLoggedAttributes(const THttpHeader& header, const TString& url, bool includeParameters, size_t sizeLimit);

+ 3 - 3
yt/cpp/mapreduce/http/host_manager.cpp

@@ -94,7 +94,7 @@ void THostManager::Reset()
 
 
 TString THostManager::GetProxyForHeavyRequest(const TClientContext& context)
 TString THostManager::GetProxyForHeavyRequest(const TClientContext& context)
 {
 {
-    auto cluster = context.ServerName;
+    auto cluster = context.ProxyAddress ? *context.ProxyAddress : context.ServerName;
     {
     {
         auto guard = Guard(Lock_);
         auto guard = Guard(Lock_);
         auto it = ClusterHosts_.find(cluster);
         auto it = ClusterHosts_.find(cluster);
@@ -121,10 +121,10 @@ THostManager::TClusterHostList THostManager::GetHosts(const TClientContext& cont
     THttpHeader header("GET", hostsEndpoint, false);
     THttpHeader header("GET", hostsEndpoint, false);
 
 
     try {
     try {
-        auto hostName = context.ServerName;
         auto requestId = CreateGuidAsString();
         auto requestId = CreateGuidAsString();
         // TODO: we need to set socket timeout here
         // TODO: we need to set socket timeout here
-        auto response = context.HttpClient->Request(GetFullUrl(hostName, context, header), requestId, header);
+        UpdateHeaderForProxyIfNeed(context.ServerName, context, header);
+        auto response = context.HttpClient->Request(GetFullUrlForProxy(context.ServerName, context, header), requestId, header);
         auto hosts = ParseJsonStringArray(response->GetResponse());
         auto hosts = ParseJsonStringArray(response->GetResponse());
         for (auto& host : hosts) {
         for (auto& host : hosts) {
             host = CreateHostNameWithPort(host, context);
             host = CreateHostNameWithPort(host, context);

Some files were not shown because too many files changed in this diff