|
@@ -93,26 +93,34 @@ void TViewerPipeClient::SendDelayedRequests() {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
-TPathId TViewerPipeClient::GetPathId(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev) {
|
|
|
- if (ev->Get()->Request->ResultSet.size() == 1) {
|
|
|
- if (ev->Get()->Request->ResultSet.begin()->Self) {
|
|
|
- const auto& info = ev->Get()->Request->ResultSet.begin()->Self->Info;
|
|
|
+TPathId TViewerPipeClient::GetPathId(const TEvTxProxySchemeCache::TEvNavigateKeySetResult& ev) {
|
|
|
+ if (ev.Request->ResultSet.size() == 1) {
|
|
|
+ if (ev.Request->ResultSet.begin()->Self) {
|
|
|
+ const auto& info = ev.Request->ResultSet.begin()->Self->Info;
|
|
|
return TPathId(info.GetSchemeshardId(), info.GetPathId());
|
|
|
}
|
|
|
- if (ev->Get()->Request->ResultSet.begin()->TableId) {
|
|
|
- return ev->Get()->Request->ResultSet.begin()->TableId.PathId;
|
|
|
+ if (ev.Request->ResultSet.begin()->TableId) {
|
|
|
+ return ev.Request->ResultSet.begin()->TableId.PathId;
|
|
|
}
|
|
|
}
|
|
|
return {};
|
|
|
}
|
|
|
|
|
|
-TString TViewerPipeClient::GetPath(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev) {
|
|
|
- if (ev->Get()->Request->ResultSet.size() == 1) {
|
|
|
- return CanonizePath(ev->Get()->Request->ResultSet.begin()->Path);
|
|
|
+TString TViewerPipeClient::GetPath(const TEvTxProxySchemeCache::TEvNavigateKeySetResult& ev) {
|
|
|
+ if (ev.Request->ResultSet.size() == 1) {
|
|
|
+ return CanonizePath(ev.Request->ResultSet.begin()->Path);
|
|
|
}
|
|
|
return {};
|
|
|
}
|
|
|
|
|
|
+TPathId TViewerPipeClient::GetPathId(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev) {
|
|
|
+ return GetPathId(*ev->Get());
|
|
|
+}
|
|
|
+
|
|
|
+TString TViewerPipeClient::GetPath(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev) {
|
|
|
+ return GetPath(*ev->Get());
|
|
|
+}
|
|
|
+
|
|
|
bool TViewerPipeClient::IsSuccess(const std::unique_ptr<TEvTxProxySchemeCache::TEvNavigateKeySetResult>& ev) {
|
|
|
return (ev->Request->ResultSet.size() == 1) && (ev->Request->ResultSet.begin()->Status == NSchemeCache::TSchemeCacheNavigate::EStatus::Ok);
|
|
|
}
|
|
@@ -147,6 +155,23 @@ TString TViewerPipeClient::GetError(const std::unique_ptr<TEvTxProxySchemeCache:
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+bool TViewerPipeClient::IsSuccess(const std::unique_ptr<TEvStateStorage::TEvBoardInfo>& ev) {
|
|
|
+ return ev->Status == TEvStateStorage::TEvBoardInfo::EStatus::Ok;
|
|
|
+}
|
|
|
+
|
|
|
+TString TViewerPipeClient::GetError(const std::unique_ptr<TEvStateStorage::TEvBoardInfo>& ev) {
|
|
|
+ switch (ev->Status) {
|
|
|
+ case TEvStateStorage::TEvBoardInfo::EStatus::Unknown:
|
|
|
+ return "Unknown";
|
|
|
+ case TEvStateStorage::TEvBoardInfo::EStatus::Ok:
|
|
|
+ return "Ok";
|
|
|
+ case TEvStateStorage::TEvBoardInfo::EStatus::NotAvailable:
|
|
|
+ return "NotAvailable";
|
|
|
+ default:
|
|
|
+ return ::ToString(static_cast<int>(ev->Status));
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
void TViewerPipeClient::RequestHiveDomainStats(NNodeWhiteboard::TTabletId hiveId) {
|
|
|
TActorId pipeClient = ConnectTabletPipe(hiveId);
|
|
|
THolder<TEvHive::TEvRequestHiveDomainStats> request = MakeHolder<TEvHive::TEvRequestHiveDomainStats>();
|
|
@@ -540,10 +565,10 @@ void TViewerPipeClient::RequestStateStorageMetadataCacheEndpointsLookup(const TS
|
|
|
++Requests;
|
|
|
}
|
|
|
|
|
|
-std::vector<TNodeId> TViewerPipeClient::GetNodesFromBoardReply(TEvStateStorage::TEvBoardInfo::TPtr& ev) {
|
|
|
+std::vector<TNodeId> TViewerPipeClient::GetNodesFromBoardReply(const TEvStateStorage::TEvBoardInfo& ev) {
|
|
|
std::vector<TNodeId> databaseNodes;
|
|
|
- if (ev->Get()->Status == TEvStateStorage::TEvBoardInfo::EStatus::Ok) {
|
|
|
- for (const auto& [actorId, infoEntry] : ev->Get()->InfoEntries) {
|
|
|
+ if (ev.Status == TEvStateStorage::TEvBoardInfo::EStatus::Ok) {
|
|
|
+ for (const auto& [actorId, infoEntry] : ev.InfoEntries) {
|
|
|
databaseNodes.emplace_back(actorId.NodeId());
|
|
|
}
|
|
|
}
|
|
@@ -552,11 +577,20 @@ std::vector<TNodeId> TViewerPipeClient::GetNodesFromBoardReply(TEvStateStorage::
|
|
|
return databaseNodes;
|
|
|
}
|
|
|
|
|
|
+std::vector<TNodeId> TViewerPipeClient::GetNodesFromBoardReply(TEvStateStorage::TEvBoardInfo::TPtr& ev) {
|
|
|
+ return GetNodesFromBoardReply(*ev->Get());
|
|
|
+}
|
|
|
+
|
|
|
void TViewerPipeClient::InitConfig(const TCgiParameters& params) {
|
|
|
Followers = FromStringWithDefault(params.Get("followers"), Followers);
|
|
|
Metrics = FromStringWithDefault(params.Get("metrics"), Metrics);
|
|
|
WithRetry = FromStringWithDefault(params.Get("with_retry"), WithRetry);
|
|
|
MaxRequestsInFlight = FromStringWithDefault(params.Get("max_requests_in_flight"), MaxRequestsInFlight);
|
|
|
+ Database = params.Get("database");
|
|
|
+ if (!Database) {
|
|
|
+ Database = params.Get("tenant");
|
|
|
+ }
|
|
|
+ Direct = FromStringWithDefault<bool>(params.Get("direct"), Direct);
|
|
|
}
|
|
|
|
|
|
void TViewerPipeClient::InitConfig(const TRequestSettings& settings) {
|
|
@@ -653,23 +687,43 @@ void TViewerPipeClient::Handle(TEvTabletPipe::TEvClientConnected::TPtr& ev) {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+void TViewerPipeClient::HandleResolveResource(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev) {
|
|
|
+ ResourceNavigateResponse->Set(std::move(ev));
|
|
|
+ if (ResourceNavigateResponse->IsOk()) {
|
|
|
+ TSchemeCacheNavigate::TEntry& entry(ResourceNavigateResponse->Get()->Request->ResultSet.front());
|
|
|
+ SharedDatabase = CanonizePath(entry.Path);
|
|
|
+ if (SharedDatabase == AppData()->TenantName) {
|
|
|
+ Direct = true;
|
|
|
+ return Bootstrap(); // retry bootstrap without redirect this time
|
|
|
+ }
|
|
|
+ DatabaseBoardInfoResponse = MakeRequestStateStorageEndpointsLookup(SharedDatabase);
|
|
|
+ } else {
|
|
|
+ ReplyAndPassAway(GetHTTPBADREQUEST("text/plain", "Failed to resolve database - shared database not found"));
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
void TViewerPipeClient::HandleResolveDatabase(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev) {
|
|
|
- if (ev->Get()->Request->ResultSet.size() == 1 && ev->Get()->Request->ResultSet.begin()->Status == NSchemeCache::TSchemeCacheNavigate::EStatus::Ok) {
|
|
|
- TSchemeCacheNavigate::TEntry& entry(ev->Get()->Request->ResultSet.front());
|
|
|
- if (entry.DomainInfo) {
|
|
|
- if (entry.DomainInfo->ResourcesDomainKey && entry.DomainInfo->DomainKey != entry.DomainInfo->ResourcesDomainKey) {
|
|
|
- RequestSchemeCacheNavigate(TPathId(entry.DomainInfo->ResourcesDomainKey));
|
|
|
- } else {
|
|
|
- RequestStateStorageEndpointsLookup(CanonizePath(entry.Path));
|
|
|
- }
|
|
|
+ DatabaseNavigateResponse->Set(std::move(ev));
|
|
|
+ if (DatabaseNavigateResponse->IsOk()) {
|
|
|
+ TSchemeCacheNavigate::TEntry& entry(DatabaseNavigateResponse->Get()->Request->ResultSet.front());
|
|
|
+ if (entry.DomainInfo && entry.DomainInfo->ResourcesDomainKey && entry.DomainInfo->DomainKey != entry.DomainInfo->ResourcesDomainKey) {
|
|
|
+ ResourceNavigateResponse = MakeRequestSchemeCacheNavigate(TPathId(entry.DomainInfo->ResourcesDomainKey));
|
|
|
+ Become(&TViewerPipeClient::StateResolveResource);
|
|
|
+ return;
|
|
|
}
|
|
|
+ DatabaseBoardInfoResponse = MakeRequestStateStorageEndpointsLookup(CanonizePath(entry.Path));
|
|
|
} else {
|
|
|
- ReplyAndPassAway(GetHTTPBADREQUEST("text/plain", "Failed to resolve database"));
|
|
|
+ ReplyAndPassAway(GetHTTPBADREQUEST("text/plain", "Failed to resolve database - not found"));
|
|
|
}
|
|
|
}
|
|
|
|
|
|
-void TViewerPipeClient::HandleResolveDatabase(TEvStateStorage::TEvBoardInfo::TPtr& ev) {
|
|
|
- ReplyAndPassAway(MakeForward(GetNodesFromBoardReply(ev)));
|
|
|
+void TViewerPipeClient::HandleResolve(TEvStateStorage::TEvBoardInfo::TPtr& ev) {
|
|
|
+ DatabaseBoardInfoResponse->Set(std::move(ev));
|
|
|
+ if (DatabaseBoardInfoResponse->IsOk()) {
|
|
|
+ ReplyAndPassAway(MakeForward(GetNodesFromBoardReply(DatabaseBoardInfoResponse->GetRef())));
|
|
|
+ } else {
|
|
|
+ ReplyAndPassAway(GetHTTPBADREQUEST("text/plain", "Failed to resolve database - no nodes found"));
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
void TViewerPipeClient::HandleTimeout() {
|
|
@@ -678,15 +732,33 @@ void TViewerPipeClient::HandleTimeout() {
|
|
|
|
|
|
STATEFN(TViewerPipeClient::StateResolveDatabase) {
|
|
|
switch (ev->GetTypeRewrite()) {
|
|
|
- hFunc(TEvStateStorage::TEvBoardInfo, HandleResolveDatabase);
|
|
|
+ hFunc(TEvStateStorage::TEvBoardInfo, HandleResolve);
|
|
|
hFunc(TEvTxProxySchemeCache::TEvNavigateKeySetResult, HandleResolveDatabase);
|
|
|
cFunc(TEvents::TEvWakeup::EventType, HandleTimeout);
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+STATEFN(TViewerPipeClient::StateResolveResource) {
|
|
|
+ switch (ev->GetTypeRewrite()) {
|
|
|
+ hFunc(TEvStateStorage::TEvBoardInfo, HandleResolve);
|
|
|
+ hFunc(TEvTxProxySchemeCache::TEvNavigateKeySetResult, HandleResolveResource);
|
|
|
+ cFunc(TEvents::TEvWakeup::EventType, HandleTimeout);
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
void TViewerPipeClient::RedirectToDatabase(const TString& database) {
|
|
|
- RequestSchemeCacheNavigate(database);
|
|
|
- Become(&TViewerPipeClient::StateResolveDatabase, TDuration::MilliSeconds(1000), new TEvents::TEvWakeup());
|
|
|
+ DatabaseNavigateResponse = MakeRequestSchemeCacheNavigate(database);
|
|
|
+ Become(&TViewerPipeClient::StateResolveDatabase);
|
|
|
+}
|
|
|
+
|
|
|
+bool TViewerPipeClient::NeedToRedirect() {
|
|
|
+ Direct |= !Event->Get()->Request.GetHeader("X-Forwarded-From-Node").empty(); // we're already forwarding
|
|
|
+ Direct |= (Database == AppData()->TenantName) || Database.empty(); // we're already on the right node or don't use database filter
|
|
|
+ if (Database && !Direct) {
|
|
|
+ RedirectToDatabase(Database); // to find some dynamic node and redirect query there
|
|
|
+ return true;
|
|
|
+ }
|
|
|
+ return false;
|
|
|
}
|
|
|
|
|
|
void TViewerPipeClient::PassAway() {
|