|
@@ -130,16 +130,16 @@ private:
|
|
|
////////////////////////////////////////////////////////////////////////////////
|
|
|
|
|
|
THttpHeader::THttpHeader(const TString& method, const TString& command, bool isApi)
|
|
|
- : Method(method)
|
|
|
- , Command(command)
|
|
|
- , IsApi(isApi)
|
|
|
+ : Method_(method)
|
|
|
+ , Command_(command)
|
|
|
+ , IsApi_(isApi)
|
|
|
{ }
|
|
|
|
|
|
void THttpHeader::AddParameter(const TString& key, TNode value, bool overwrite)
|
|
|
{
|
|
|
- auto it = Parameters.find(key);
|
|
|
- if (it == Parameters.end()) {
|
|
|
- Parameters.emplace(key, std::move(value));
|
|
|
+ auto it = Parameters_.find(key);
|
|
|
+ if (it == Parameters_.end()) {
|
|
|
+ Parameters_.emplace(key, std::move(value));
|
|
|
} else {
|
|
|
if (overwrite) {
|
|
|
it->second = std::move(value);
|
|
@@ -158,12 +158,12 @@ void THttpHeader::MergeParameters(const TNode& newParameters, bool overwrite)
|
|
|
|
|
|
void THttpHeader::RemoveParameter(const TString& key)
|
|
|
{
|
|
|
- Parameters.erase(key);
|
|
|
+ Parameters_.erase(key);
|
|
|
}
|
|
|
|
|
|
TNode THttpHeader::GetParameters() const
|
|
|
{
|
|
|
- return Parameters;
|
|
|
+ return Parameters_;
|
|
|
}
|
|
|
|
|
|
void THttpHeader::AddTransactionId(const TTransactionId& transactionId, bool overwrite)
|
|
@@ -202,81 +202,81 @@ void THttpHeader::AddMutationId()
|
|
|
|
|
|
bool THttpHeader::HasMutationId() const
|
|
|
{
|
|
|
- return Parameters.contains("mutation_id");
|
|
|
+ return Parameters_.contains("mutation_id");
|
|
|
}
|
|
|
|
|
|
void THttpHeader::SetToken(const TString& token)
|
|
|
{
|
|
|
- Token = token;
|
|
|
+ Token_ = token;
|
|
|
}
|
|
|
|
|
|
void THttpHeader::SetProxyAddress(const TString& proxyAddress)
|
|
|
{
|
|
|
- ProxyAddress = proxyAddress;
|
|
|
+ ProxyAddress_ = proxyAddress;
|
|
|
}
|
|
|
|
|
|
void THttpHeader::SetHostPort(const TString& hostPort)
|
|
|
{
|
|
|
- HostPort = hostPort;
|
|
|
+ HostPort_ = hostPort;
|
|
|
}
|
|
|
|
|
|
void THttpHeader::SetImpersonationUser(const TString& impersonationUser)
|
|
|
{
|
|
|
- ImpersonationUser = impersonationUser;
|
|
|
+ ImpersonationUser_ = impersonationUser;
|
|
|
}
|
|
|
|
|
|
void THttpHeader::SetServiceTicket(const TString& ticket)
|
|
|
{
|
|
|
- ServiceTicket = ticket;
|
|
|
+ ServiceTicket_ = ticket;
|
|
|
}
|
|
|
|
|
|
void THttpHeader::SetInputFormat(const TMaybe<TFormat>& format)
|
|
|
{
|
|
|
- InputFormat = format;
|
|
|
+ InputFormat_ = format;
|
|
|
}
|
|
|
|
|
|
void THttpHeader::SetOutputFormat(const TMaybe<TFormat>& format)
|
|
|
{
|
|
|
- OutputFormat = format;
|
|
|
+ OutputFormat_ = format;
|
|
|
}
|
|
|
|
|
|
TMaybe<TFormat> THttpHeader::GetOutputFormat() const
|
|
|
{
|
|
|
- return OutputFormat;
|
|
|
+ return OutputFormat_;
|
|
|
}
|
|
|
|
|
|
void THttpHeader::SetRequestCompression(const TString& compression)
|
|
|
{
|
|
|
- RequestCompression = compression;
|
|
|
+ RequestCompression_ = compression;
|
|
|
}
|
|
|
|
|
|
void THttpHeader::SetResponseCompression(const TString& compression)
|
|
|
{
|
|
|
- ResponseCompression = compression;
|
|
|
+ ResponseCompression_ = compression;
|
|
|
}
|
|
|
|
|
|
TString THttpHeader::GetCommand() const
|
|
|
{
|
|
|
- return Command;
|
|
|
+ return Command_;
|
|
|
}
|
|
|
|
|
|
TString THttpHeader::GetUrl(bool needProxy) const
|
|
|
{
|
|
|
TStringStream url;
|
|
|
|
|
|
- if (needProxy && !ProxyAddress.empty()) {
|
|
|
- url << ProxyAddress << "/";
|
|
|
+ if (needProxy && !ProxyAddress_.empty()) {
|
|
|
+ url << ProxyAddress_ << "/";
|
|
|
return url.Str();
|
|
|
}
|
|
|
|
|
|
- if (!ProxyAddress.empty()) {
|
|
|
- url << HostPort;
|
|
|
+ if (!ProxyAddress_.empty()) {
|
|
|
+ url << HostPort_;
|
|
|
}
|
|
|
|
|
|
- if (IsApi) {
|
|
|
- url << "/api/" << TConfig::Get()->ApiVersion << "/" << Command;
|
|
|
+ if (IsApi_) {
|
|
|
+ url << "/api/" << TConfig::Get()->ApiVersion << "/" << Command_;
|
|
|
} else {
|
|
|
- url << "/" << Command;
|
|
|
+ url << "/" << Command_;
|
|
|
}
|
|
|
|
|
|
return url.Str();
|
|
@@ -284,16 +284,16 @@ TString THttpHeader::GetUrl(bool needProxy) const
|
|
|
|
|
|
bool THttpHeader::ShouldAcceptFraming() const
|
|
|
{
|
|
|
- return TConfig::Get()->CommandsWithFraming.contains(Command);
|
|
|
+ return TConfig::Get()->CommandsWithFraming.contains(Command_);
|
|
|
}
|
|
|
|
|
|
TString THttpHeader::GetHeaderAsString(const TString& hostName, const TString& requestId, bool includeParameters) const
|
|
|
{
|
|
|
TStringStream result;
|
|
|
|
|
|
- result << Method << " " << GetUrl() << " HTTP/1.1\r\n";
|
|
|
+ result << Method_ << " " << GetUrl() << " HTTP/1.1\r\n";
|
|
|
|
|
|
- GetHeader(HostPort.empty() ? hostName : HostPort, requestId, includeParameters).Get()->WriteTo(&result);
|
|
|
+ GetHeader(HostPort_.empty() ? hostName : HostPort_, requestId, includeParameters).Get()->WriteTo(&result);
|
|
|
|
|
|
if (ShouldAcceptFraming()) {
|
|
|
result << "X-YT-Accept-Framing: 1\r\n";
|
|
@@ -311,25 +311,25 @@ NHttp::THeadersPtrWrapper THttpHeader::GetHeader(const TString& hostName, const
|
|
|
headers->Add("Host", hostName);
|
|
|
headers->Add("User-Agent", TProcessState::Get()->ClientVersion);
|
|
|
|
|
|
- if (!Token.empty()) {
|
|
|
- headers->Add("Authorization", "OAuth " + Token);
|
|
|
+ if (!Token_.empty()) {
|
|
|
+ headers->Add("Authorization", "OAuth " + Token_);
|
|
|
}
|
|
|
- if (!ServiceTicket.empty()) {
|
|
|
- headers->Add("X-Ya-Service-Ticket", ServiceTicket);
|
|
|
+ if (!ServiceTicket_.empty()) {
|
|
|
+ headers->Add("X-Ya-Service-Ticket", ServiceTicket_);
|
|
|
}
|
|
|
- if (!ImpersonationUser.empty()) {
|
|
|
- headers->Add("X-Yt-User-Name", ImpersonationUser);
|
|
|
+ if (!ImpersonationUser_.empty()) {
|
|
|
+ headers->Add("X-Yt-User-Name", ImpersonationUser_);
|
|
|
}
|
|
|
|
|
|
- if (Method == "PUT" || Method == "POST") {
|
|
|
+ if (Method_ == "PUT" || Method_ == "POST") {
|
|
|
headers->Add("Transfer-Encoding", "chunked");
|
|
|
}
|
|
|
|
|
|
headers->Add("X-YT-Correlation-Id", requestId);
|
|
|
headers->Add("X-YT-Header-Format", "<format=text>yson");
|
|
|
|
|
|
- headers->Add("Content-Encoding", RequestCompression);
|
|
|
- headers->Add("Accept-Encoding", ResponseCompression);
|
|
|
+ headers->Add("Content-Encoding", RequestCompression_);
|
|
|
+ headers->Add("Accept-Encoding", ResponseCompression_);
|
|
|
|
|
|
auto printYTHeader = [&headers] (const char* headerName, const TString& value) {
|
|
|
static const size_t maxHttpHeaderSize = 64 << 10;
|
|
@@ -353,14 +353,14 @@ NHttp::THeadersPtrWrapper THttpHeader::GetHeader(const TString& hostName, const
|
|
|
} while (ptr != finish);
|
|
|
};
|
|
|
|
|
|
- if (InputFormat) {
|
|
|
- printYTHeader("X-YT-Input-Format", NodeToYsonString(InputFormat->Config));
|
|
|
+ if (InputFormat_) {
|
|
|
+ printYTHeader("X-YT-Input-Format", NodeToYsonString(InputFormat_->Config));
|
|
|
}
|
|
|
- if (OutputFormat) {
|
|
|
- printYTHeader("X-YT-Output-Format", NodeToYsonString(OutputFormat->Config));
|
|
|
+ if (OutputFormat_) {
|
|
|
+ printYTHeader("X-YT-Output-Format", NodeToYsonString(OutputFormat_->Config));
|
|
|
}
|
|
|
if (includeParameters) {
|
|
|
- printYTHeader("X-YT-Parameters", NodeToYsonString(Parameters));
|
|
|
+ printYTHeader("X-YT-Parameters", NodeToYsonString(Parameters_));
|
|
|
}
|
|
|
|
|
|
return NHttp::THeadersPtrWrapper(std::move(headers));
|
|
@@ -368,7 +368,7 @@ NHttp::THeadersPtrWrapper THttpHeader::GetHeader(const TString& hostName, const
|
|
|
|
|
|
const TString& THttpHeader::GetMethod() const
|
|
|
{
|
|
|
- return Method;
|
|
|
+ return Method_;
|
|
|
}
|
|
|
|
|
|
////////////////////////////////////////////////////////////////////////////////
|
|
@@ -679,8 +679,8 @@ THttpResponse::THttpResponse(
|
|
|
IInputStream* socketStream,
|
|
|
const TString& requestId,
|
|
|
const TString& hostName)
|
|
|
- : HttpInput_(socketStream)
|
|
|
- , RequestId_(requestId)
|
|
|
+ : RequestId_(requestId)
|
|
|
+ , HttpInput_(socketStream)
|
|
|
, HostName_(GetProxyName(HttpInput_).GetOrElse(hostName))
|
|
|
, Unframe_(HttpInput_.Headers().HasHeader("X-YT-Framing"))
|
|
|
{
|
|
@@ -887,64 +887,63 @@ size_t THttpResponse::UnframeSkip(size_t len)
|
|
|
////////////////////////////////////////////////////////////////////////////////
|
|
|
|
|
|
THttpRequest::THttpRequest()
|
|
|
-{
|
|
|
- RequestId = CreateGuidAsString();
|
|
|
-}
|
|
|
+ : RequestId_(CreateGuidAsString())
|
|
|
+{ }
|
|
|
|
|
|
THttpRequest::THttpRequest(const TString& requestId)
|
|
|
- : RequestId(requestId)
|
|
|
+ : RequestId_(requestId)
|
|
|
{ }
|
|
|
|
|
|
THttpRequest::~THttpRequest()
|
|
|
{
|
|
|
- if (!Connection) {
|
|
|
+ if (!Connection_) {
|
|
|
return;
|
|
|
}
|
|
|
|
|
|
- if (Input && Input->IsKeepAlive() && Input->IsExhausted()) {
|
|
|
+ if (Input_ && Input_->IsKeepAlive() && Input_->IsExhausted()) {
|
|
|
// We should return to the pool only connections where HTTP response was fully read.
|
|
|
// Otherwise next reader might read our remaining data and misinterpret them (YT-6510).
|
|
|
- TConnectionPool::Get()->Release(Connection);
|
|
|
+ TConnectionPool::Get()->Release(Connection_);
|
|
|
} else {
|
|
|
- TConnectionPool::Get()->Invalidate(HostName, Connection);
|
|
|
+ TConnectionPool::Get()->Invalidate(HostName_, Connection_);
|
|
|
}
|
|
|
}
|
|
|
|
|
|
TString THttpRequest::GetRequestId() const
|
|
|
{
|
|
|
- return RequestId;
|
|
|
+ return RequestId_;
|
|
|
}
|
|
|
|
|
|
void THttpRequest::Connect(TString hostName, TDuration socketTimeout)
|
|
|
{
|
|
|
- HostName = std::move(hostName);
|
|
|
+ HostName_ = std::move(hostName);
|
|
|
YT_LOG_DEBUG("REQ %v - requesting connection to %v from connection pool",
|
|
|
- RequestId,
|
|
|
- HostName);
|
|
|
+ RequestId_,
|
|
|
+ HostName_);
|
|
|
|
|
|
StartTime_ = TInstant::Now();
|
|
|
- Connection = TConnectionPool::Get()->Connect(HostName, socketTimeout);
|
|
|
+ Connection_ = TConnectionPool::Get()->Connect(HostName_, socketTimeout);
|
|
|
|
|
|
YT_LOG_DEBUG("REQ %v - connection #%v",
|
|
|
- RequestId,
|
|
|
- Connection->Id);
|
|
|
+ RequestId_,
|
|
|
+ Connection_->Id);
|
|
|
}
|
|
|
|
|
|
IOutputStream* THttpRequest::StartRequestImpl(const THttpHeader& header, bool includeParameters)
|
|
|
{
|
|
|
- auto strHeader = header.GetHeaderAsString(HostName, RequestId, includeParameters);
|
|
|
+ auto strHeader = header.GetHeaderAsString(HostName_, RequestId_, includeParameters);
|
|
|
Url_ = header.GetUrl(true);
|
|
|
|
|
|
- LogRequest(header, Url_, includeParameters, RequestId, HostName);
|
|
|
+ LogRequest(header, Url_, includeParameters, RequestId_, HostName_);
|
|
|
|
|
|
LoggedAttributes_ = GetLoggedAttributes(header, Url_, includeParameters, 128);
|
|
|
|
|
|
auto outputFormat = header.GetOutputFormat();
|
|
|
if (outputFormat && outputFormat->IsTextYson()) {
|
|
|
- LogResponse = true;
|
|
|
+ LogResponse_ = true;
|
|
|
}
|
|
|
|
|
|
- RequestStream_ = MakeHolder<TRequestStream>(this, *Connection->Socket.Get());
|
|
|
+ RequestStream_ = MakeHolder<TRequestStream>(this, *Connection_->Socket.Get());
|
|
|
|
|
|
RequestStream_->Write(strHeader.data(), strHeader.size());
|
|
|
return RequestStream_.Get();
|
|
@@ -980,17 +979,17 @@ void THttpRequest::SmallRequest(const THttpHeader& header, TMaybe<TStringBuf> bo
|
|
|
|
|
|
THttpResponse* THttpRequest::GetResponseStream()
|
|
|
{
|
|
|
- if (!Input) {
|
|
|
- SocketInput.Reset(new TSocketInput(*Connection->Socket.Get()));
|
|
|
+ if (!Input_) {
|
|
|
+ SocketInput_.Reset(new TSocketInput(*Connection_->Socket.Get()));
|
|
|
if (TConfig::Get()->UseAbortableResponse) {
|
|
|
Y_ABORT_UNLESS(!Url_.empty());
|
|
|
- Input.Reset(new TAbortableHttpResponse(SocketInput.Get(), RequestId, HostName, Url_));
|
|
|
+ Input_.Reset(new TAbortableHttpResponse(SocketInput_.Get(), RequestId_, HostName_, Url_));
|
|
|
} else {
|
|
|
- Input.Reset(new THttpResponse(SocketInput.Get(), RequestId, HostName));
|
|
|
+ Input_.Reset(new THttpResponse(SocketInput_.Get(), RequestId_, HostName_));
|
|
|
}
|
|
|
- Input->CheckErrorResponse();
|
|
|
+ Input_->CheckErrorResponse();
|
|
|
}
|
|
|
- return Input.Get();
|
|
|
+ return Input_.Get();
|
|
|
}
|
|
|
|
|
|
TString THttpRequest::GetResponse()
|
|
@@ -1003,15 +1002,15 @@ TString THttpRequest::GetResponse()
|
|
|
<< "HostName: " << GetResponseStream()->GetHostName() << "; "
|
|
|
<< LoggedAttributes_;
|
|
|
|
|
|
- if (LogResponse) {
|
|
|
+ if (LogResponse_) {
|
|
|
constexpr auto sizeLimit = 1 << 7;
|
|
|
YT_LOG_DEBUG("RSP %v - received response (Response: '%v'; %v)",
|
|
|
- RequestId,
|
|
|
+ RequestId_,
|
|
|
TruncateForLogs(result, sizeLimit),
|
|
|
loggedAttributes.Str());
|
|
|
} else {
|
|
|
YT_LOG_DEBUG("RSP %v - received response of %v bytes (%v)",
|
|
|
- RequestId,
|
|
|
+ RequestId_,
|
|
|
result.size(),
|
|
|
loggedAttributes.Str());
|
|
|
}
|
|
@@ -1024,8 +1023,8 @@ int THttpRequest::GetHttpCode() {
|
|
|
|
|
|
void THttpRequest::InvalidateConnection()
|
|
|
{
|
|
|
- TConnectionPool::Get()->Invalidate(HostName, Connection);
|
|
|
- Connection.Reset();
|
|
|
+ TConnectionPool::Get()->Invalidate(HostName_, Connection_);
|
|
|
+ Connection_.Reset();
|
|
|
}
|
|
|
|
|
|
////////////////////////////////////////////////////////////////////////////////
|