|
@@ -1,11 +1,9 @@
|
|
|
-#include "base_compute_actor.h"
|
|
|
+#include "base_status_updater_actor.h"
|
|
|
|
|
|
-#include <ydb/core/fq/libs/common/compression.h>
|
|
|
#include <ydb/core/fq/libs/common/util.h>
|
|
|
#include <ydb/core/fq/libs/compute/common/metrics.h>
|
|
|
#include <ydb/core/fq/libs/compute/common/retry_actor.h>
|
|
|
#include <ydb/core/fq/libs/compute/common/run_actor_params.h>
|
|
|
-#include <ydb/core/fq/libs/compute/common/utils.h>
|
|
|
#include <ydb/core/fq/libs/compute/ydb/events/events.h>
|
|
|
#include <ydb/core/fq/libs/compute/ydb/control_plane/compute_database_control_plane_service.h>
|
|
|
#include <ydb/core/fq/libs/ydb/ydb.h>
|
|
@@ -14,7 +12,6 @@
|
|
|
|
|
|
#include <ydb/library/yql/dq/actors/dq.h>
|
|
|
#include <ydb/library/yql/providers/common/metrics/service_counters.h>
|
|
|
-#include <ydb/library/yql/public/issue/yql_issue_message.h>
|
|
|
|
|
|
#include <ydb/public/sdk/cpp/client/ydb_query/client.h>
|
|
|
#include <ydb/public/sdk/cpp/client/ydb_operation/operation.h>
|
|
@@ -37,7 +34,7 @@ namespace NFq {
|
|
|
using namespace NActors;
|
|
|
using namespace NFq;
|
|
|
|
|
|
-class TStatusTrackerActor : public TBaseComputeActor<TStatusTrackerActor> {
|
|
|
+class TStatusTrackerActor : public TBaseStatusUpdaterActor<TStatusTrackerActor> {
|
|
|
public:
|
|
|
using IRetryPolicy = IRetryPolicy<const TEvYdbCompute::TEvGetOperationResponse::TPtr&>;
|
|
|
|
|
@@ -70,7 +67,7 @@ public:
|
|
|
};
|
|
|
|
|
|
TStatusTrackerActor(const TRunActorParams& params, const TActorId& parent, const TActorId& connector, const TActorId& pinger, const NYdb::TOperation::TOperationId& operationId, const ::NYql::NCommon::TServiceCounters& queryCounters)
|
|
|
- : TBaseComputeActor(queryCounters, "StatusTracker")
|
|
|
+ : TBaseStatusUpdaterActor(params.Config.GetCommon(), queryCounters, "StatusTracker")
|
|
|
, Params(params)
|
|
|
, Parent(parent)
|
|
|
, Connector(connector)
|
|
@@ -78,8 +75,9 @@ public:
|
|
|
, OperationId(operationId)
|
|
|
, Counters(GetStepCountersSubgroup())
|
|
|
, BackoffTimer(20, 1000)
|
|
|
- , Compressor(params.Config.GetCommon().GetQueryArtifactsCompressionMethod(), params.Config.GetCommon().GetQueryArtifactsCompressionMinSize())
|
|
|
- {}
|
|
|
+ {
|
|
|
+ SetPingCounters(Counters.GetCounters(ERequestType::RT_PING));
|
|
|
+ }
|
|
|
|
|
|
static constexpr char ActorName[] = "FQ_STATUS_TRACKER";
|
|
|
|
|
@@ -95,21 +93,17 @@ public:
|
|
|
)
|
|
|
|
|
|
void Handle(const TEvents::TEvForwardPingResponse::TPtr& ev) {
|
|
|
- auto pingCounters = Counters.GetCounters(ERequestType::RT_PING);
|
|
|
- pingCounters->InFly->Dec();
|
|
|
+ OnPingRequestFinish(ev.Get()->Get()->Success);
|
|
|
|
|
|
if (ev->Cookie) {
|
|
|
return;
|
|
|
}
|
|
|
|
|
|
- pingCounters->LatencyMs->Collect((TInstant::Now() - StartTime).MilliSeconds());
|
|
|
if (ev.Get()->Get()->Success) {
|
|
|
- pingCounters->Ok->Inc();
|
|
|
LOG_I("Information about the status of operation is stored");
|
|
|
Send(Parent, new TEvYdbCompute::TEvStatusTrackerResponse(Issues, Status, ExecStatus, ComputeStatus));
|
|
|
CompleteAndPassAway();
|
|
|
} else {
|
|
|
- pingCounters->Error->Inc();
|
|
|
LOG_E("Error saving information about the status of operation");
|
|
|
Send(Parent, new TEvYdbCompute::TEvStatusTrackerResponse(NYql::TIssues{NYql::TIssue{TStringBuilder{} << "Error saving information about the status of operation: " << ProtoToString(OperationId)}}, NYdb::EStatus::INTERNAL_ERROR, ExecStatus, ComputeStatus));
|
|
|
FailedAndPassAway();
|
|
@@ -140,7 +134,6 @@ public:
|
|
|
}
|
|
|
|
|
|
ReportPublicCounters(response.QueryStats);
|
|
|
- StartTime = TInstant::Now();
|
|
|
LOG_D("Execution status: " << static_cast<int>(response.ExecStatus));
|
|
|
switch (response.ExecStatus) {
|
|
|
case NYdb::NQuery::EExecStatus::Unspecified:
|
|
@@ -217,75 +210,51 @@ public:
|
|
|
Register(new TRetryActor<TEvYdbCompute::TEvGetOperationRequest, TEvYdbCompute::TEvGetOperationResponse, NYdb::TOperation::TOperationId>(Counters.GetCounters(ERequestType::RT_GET_OPERATION), delay, SelfId(), Connector, OperationId));
|
|
|
}
|
|
|
|
|
|
- void UpdateProgress() {
|
|
|
- auto pingCounters = Counters.GetCounters(ERequestType::RT_PING);
|
|
|
- pingCounters->InFly->Inc();
|
|
|
+ std::pair<Fq::Private::PingTaskRequest, double> GetPingTaskRequestWithStatistic(std::optional<FederatedQuery::QueryMeta::ComputeStatus> computeStatus, std::optional<NYql::NDqProto::StatusIds::StatusCode> pendingStatusCode) {
|
|
|
Fq::Private::PingTaskRequest pingTaskRequest;
|
|
|
- PrepareAstAndPlan(pingTaskRequest, QueryStats.query_plan(), QueryStats.query_ast());
|
|
|
+ double cpuUsage = 0.0;
|
|
|
try {
|
|
|
- pingTaskRequest.set_statistics(GetV1StatFromV2Plan(QueryStats.query_plan()));
|
|
|
+ pingTaskRequest = GetPingTaskRequestStatistics(computeStatus, pendingStatusCode, Issues, QueryStats, &cpuUsage);
|
|
|
} catch(const NJson::TJsonException& ex) {
|
|
|
LOG_E("Error statistics conversion: " << ex.what());
|
|
|
}
|
|
|
+
|
|
|
+ return { pingTaskRequest, cpuUsage };
|
|
|
+ }
|
|
|
+
|
|
|
+ void UpdateProgress() {
|
|
|
+ OnPingRequestStart();
|
|
|
+
|
|
|
+ Fq::Private::PingTaskRequest pingTaskRequest = GetPingTaskRequestWithStatistic(std::nullopt, std::nullopt).first;
|
|
|
Send(Pinger, new TEvents::TEvForwardPingRequest(pingTaskRequest), 0, 1);
|
|
|
}
|
|
|
|
|
|
+ void UpdateCpuQuota(double cpuUsage) {
|
|
|
+ TDuration duration = TDuration::MicroSeconds(QueryStats.total_duration_us());
|
|
|
+ if (cpuUsage && duration) {
|
|
|
+ Send(NFq::ComputeDatabaseControlPlaneServiceActorId(), new TEvYdbCompute::TEvCpuQuotaAdjust(Params.Scope.ToString(), duration, cpuUsage));
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
void Failed() {
|
|
|
LOG_I("Execution status: Failed, Status: " << Status << ", StatusCode: " << NYql::NDqProto::StatusIds::StatusCode_Name(StatusCode) << " Issues: " << Issues.ToOneLineString());
|
|
|
- auto pingCounters = Counters.GetCounters(ERequestType::RT_PING);
|
|
|
- pingCounters->InFly->Inc();
|
|
|
- Fq::Private::PingTaskRequest pingTaskRequest;
|
|
|
- NYql::IssuesToMessage(Issues, pingTaskRequest.mutable_issues());
|
|
|
- pingTaskRequest.set_pending_status_code(StatusCode);
|
|
|
- PrepareAstAndPlan(pingTaskRequest, QueryStats.query_plan(), QueryStats.query_ast());
|
|
|
- try {
|
|
|
- TDuration duration = TDuration::MicroSeconds(QueryStats.total_duration_us());
|
|
|
- double cpuUsage = 0.0;
|
|
|
- pingTaskRequest.set_statistics(GetV1StatFromV2Plan(QueryStats.query_plan(), &cpuUsage));
|
|
|
- if (duration && cpuUsage) {
|
|
|
- Send(NFq::ComputeDatabaseControlPlaneServiceActorId(), new TEvYdbCompute::TEvCpuQuotaAdjust(Params.Scope.ToString(), duration, cpuUsage));
|
|
|
- }
|
|
|
- } catch(const NJson::TJsonException& ex) {
|
|
|
- LOG_E("Error statistics conversion: " << ex.what());
|
|
|
- }
|
|
|
+ OnPingRequestStart();
|
|
|
+
|
|
|
+ auto [pingTaskRequest, cpuUsage] = GetPingTaskRequestWithStatistic(std::nullopt, StatusCode);
|
|
|
+ UpdateCpuQuota(cpuUsage);
|
|
|
+
|
|
|
Send(Pinger, new TEvents::TEvForwardPingRequest(pingTaskRequest));
|
|
|
}
|
|
|
|
|
|
void Complete() {
|
|
|
LOG_I("Execution status: Complete " << Status << ", StatusCode: " << NYql::NDqProto::StatusIds::StatusCode_Name(StatusCode) << " Issues: " << Issues.ToOneLineString());
|
|
|
- auto pingCounters = Counters.GetCounters(ERequestType::RT_PING);
|
|
|
- pingCounters->InFly->Inc();
|
|
|
- Fq::Private::PingTaskRequest pingTaskRequest;
|
|
|
- NYql::IssuesToMessage(Issues, pingTaskRequest.mutable_issues());
|
|
|
- ComputeStatus = ::FederatedQuery::QueryMeta::COMPLETING;
|
|
|
- pingTaskRequest.set_status(ComputeStatus);
|
|
|
- PrepareAstAndPlan(pingTaskRequest, QueryStats.query_plan(), QueryStats.query_ast());
|
|
|
- try {
|
|
|
- TDuration duration = TDuration::MicroSeconds(QueryStats.total_duration_us());
|
|
|
- double cpuUsage = 0.0;
|
|
|
- pingTaskRequest.set_statistics(GetV1StatFromV2Plan(QueryStats.query_plan(), &cpuUsage));
|
|
|
- if (duration && cpuUsage) {
|
|
|
- Send(NFq::ComputeDatabaseControlPlaneServiceActorId(), new TEvYdbCompute::TEvCpuQuotaAdjust(Params.Scope.ToString(), duration, cpuUsage));
|
|
|
- }
|
|
|
- } catch(const NJson::TJsonException& ex) {
|
|
|
- LOG_E("Error statistics conversion: " << ex.what());
|
|
|
- }
|
|
|
- Send(Pinger, new TEvents::TEvForwardPingRequest(pingTaskRequest));
|
|
|
- }
|
|
|
+ OnPingRequestStart();
|
|
|
|
|
|
- void PrepareAstAndPlan(Fq::Private::PingTaskRequest& request, const TString& plan, const TString& expr) const {
|
|
|
- if (Compressor.IsEnabled()) {
|
|
|
- auto [astCompressionMethod, astCompressed] = Compressor.Compress(expr);
|
|
|
- request.mutable_ast_compressed()->set_method(astCompressionMethod);
|
|
|
- request.mutable_ast_compressed()->set_data(astCompressed);
|
|
|
+ ComputeStatus = ::FederatedQuery::QueryMeta::COMPLETING;
|
|
|
+ auto [pingTaskRequest, cpuUsage] = GetPingTaskRequestWithStatistic(ComputeStatus, std::nullopt);
|
|
|
+ UpdateCpuQuota(cpuUsage);
|
|
|
|
|
|
- auto [planCompressionMethod, planCompressed] = Compressor.Compress(plan);
|
|
|
- request.mutable_plan_compressed()->set_method(planCompressionMethod);
|
|
|
- request.mutable_plan_compressed()->set_data(planCompressed);
|
|
|
- } else {
|
|
|
- request.set_ast(expr);
|
|
|
- request.set_plan(plan);
|
|
|
- }
|
|
|
+ Send(Pinger, new TEvents::TEvForwardPingRequest(pingTaskRequest));
|
|
|
}
|
|
|
|
|
|
private:
|
|
@@ -295,14 +264,12 @@ private:
|
|
|
TActorId Pinger;
|
|
|
NYdb::TOperation::TOperationId OperationId;
|
|
|
TCounters Counters;
|
|
|
- TInstant StartTime;
|
|
|
NYql::TIssues Issues;
|
|
|
NYdb::EStatus Status = NYdb::EStatus::SUCCESS;
|
|
|
NYdb::NQuery::EExecStatus ExecStatus = NYdb::NQuery::EExecStatus::Unspecified;
|
|
|
NYql::NDqProto::StatusIds::StatusCode StatusCode = NYql::NDqProto::StatusIds::StatusCode::StatusIds_StatusCode_UNSPECIFIED;
|
|
|
Ydb::TableStats::QueryStats QueryStats;
|
|
|
NKikimr::TBackoffTimer BackoffTimer;
|
|
|
- const TCompressor Compressor;
|
|
|
FederatedQuery::QueryMeta::ComputeStatus ComputeStatus = FederatedQuery::QueryMeta::RUNNING;
|
|
|
};
|
|
|
|