Browse Source

Fix stats inside plan for full stats mode (#8553)

Ivan 6 months ago
parent
commit
35c8dfe081

+ 2 - 5
ydb/core/kqp/executer_actor/kqp_data_executer.cpp

@@ -215,9 +215,7 @@ public:
                 YqlIssue({}, TIssuesIds::KIKIMR_LOCKS_INVALIDATED, message));
         }
 
-        auto& response = *ResponseEv->Record.MutableResponse();
-
-        FillResponseStats(Ydb::StatusIds::SUCCESS);
+        ResponseEv->Record.MutableResponse()->SetStatus(Ydb::StatusIds::SUCCESS);
         Counters->TxProxyMon->ReportStatusOK->Inc();
 
         auto addLocks = [this](const auto& data) {
@@ -257,7 +255,7 @@ public:
             if (LockHandle) {
                 ResponseEv->LockHandle = std::move(LockHandle);
             }
-            BuildLocks(*response.MutableResult()->MutableLocks(), Locks);
+            BuildLocks(*ResponseEv->Record.MutableResponse()->MutableResult()->MutableLocks(), Locks);
         }
 
         auto resultSize = ResponseEv->GetByteSize();
@@ -283,7 +281,6 @@ public:
 
         ExecuterSpan.EndOk();
 
-        Request.Transactions.crop(0);
         AlreadyReplied = true;
         PassAway();
     }

+ 60 - 60
ydb/core/kqp/executer_actor/kqp_executer_impl.h

@@ -177,10 +177,10 @@ public:
     }
 
     void ReportEventElapsedTime() {
-        if (Stats) {
-            ui64 elapsedMicros = TlsActivationContext->GetCurrentEventTicksAsSeconds() * 1'000'000;
-            Stats->ExecuterCpuTime += TDuration::MicroSeconds(elapsedMicros);
-        }
+        YQL_ENSURE(Stats);
+
+        ui64 elapsedMicros = TlsActivationContext->GetCurrentEventTicksAsSeconds() * 1'000'000;
+        Stats->ExecuterCpuTime += TDuration::MicroSeconds(elapsedMicros);
     }
 
 protected:
@@ -330,11 +330,10 @@ protected:
         }
 
         YQL_ENSURE(channel.DstTask == 0);
+        YQL_ENSURE(Stats);
 
-        if (Stats) {
-            Stats->ResultBytes += batch.Size();
-            Stats->ResultRows += batch.RowCount();
-        }
+        Stats->ResultBytes += batch.Size();
+        Stats->ResultRows += batch.RowCount();
 
         LOG_T("Got result, channelId: " << channel.Id << ", shardId: " << task.Meta.ShardId
             << ", inputIndex: " << channel.DstInputIndex << ", from: " << ev->Sender
@@ -391,7 +390,9 @@ protected:
             << ", state: " << NYql::NDqProto::EComputeState_Name((NYql::NDqProto::EComputeState) state.GetState())
             << ", stats: " << state.GetStats());
 
-        if (Stats && state.HasStats() && Request.ProgressStatsPeriod) {
+        YQL_ENSURE(Stats);
+
+        if (state.HasStats() && Request.ProgressStatsPeriod) {
             Stats->UpdateTaskStats(taskId, state.GetStats());
             auto now = TInstant::Now();
             if (LastProgressStats + Request.ProgressStatsPeriod <= now) {
@@ -418,13 +419,11 @@ protected:
                 if (Planner->CompletedCA(taskId, computeActor)) {
                     ExtraData[computeActor].Swap(state.MutableExtraData());
 
-                    if (Stats) {
-                        Stats->AddComputeActorStats(
-                            computeActor.NodeId(),
-                            std::move(*state.MutableStats()),
-                            TDuration::MilliSeconds(AggregationSettings.GetCollectLongTasksStatsTimeoutMs())
-                        );
-                    }
+                    Stats->AddComputeActorStats(
+                        computeActor.NodeId(),
+                        std::move(*state.MutableStats()),
+                        TDuration::MilliSeconds(AggregationSettings.GetCollectLongTasksStatsTimeoutMs())
+                    );
 
                     LastTaskId = taskId;
                     LastComputeActorId = computeActor.ToString();
@@ -512,9 +511,9 @@ protected:
         auto now = TAppData::TimeProvider->Now();
         StartResolveTime = now;
 
-        if (Stats) {
-            Stats->StartTs = now;
-        }
+        YQL_ENSURE(Stats);
+
+        Stats->StartTs = now;
     }
 
     TMaybe<size_t> FindReadRangesSource(const NKqpProto::TKqpPhyStage& stage) {
@@ -1167,8 +1166,9 @@ protected:
                 : Nothing();
 
             YQL_ENSURE(!shardsResolved || nodeId);
+            YQL_ENSURE(Stats);
 
-            if (shardId && Stats) {
+            if (shardId) {
                 Stats->AffectedShards.insert(*shardId);
             }
 
@@ -1236,11 +1236,13 @@ protected:
 
         if (partitions.size() > 0 && source.GetSequentialInFlightShards() > 0 && partitions.size() > source.GetSequentialInFlightShards()) {
             auto [startShard, shardInfo] = MakeVirtualTablePartition(source, stageInfo, HolderFactory(), TypeEnv());
-            if (Stats) {
-                for (auto& [shardId, _] : partitions) {
-                    Stats->AffectedShards.insert(shardId);
-                }
+
+            YQL_ENSURE(Stats);
+
+            for (auto& [shardId, _] : partitions) {
+                Stats->AffectedShards.insert(shardId);
             }
+
             if (shardInfo.KeyReadRanges) {
                 addPartiton(startShard, {}, shardInfo, source.GetSequentialInFlightShards());
                 fillRangesForTasks();
@@ -1507,6 +1509,8 @@ protected:
         THashMap<ui64, ui64> assignedShardsCount;
         auto& stage = stageInfo.Meta.GetStage(stageInfo.Id);
 
+        YQL_ENSURE(Stats);
+
         const auto& tableInfo = stageInfo.Meta.TableConstInfo;
         const auto& keyTypes = tableInfo->KeyColumnTypes;
         ui32 metaId = 0;
@@ -1535,7 +1539,7 @@ protected:
                 nodeShards[nodeId].emplace_back(TShardInfoWithId(i.first, std::move(i.second)));
             }
 
-            if (Stats && CollectProfileStats(Request.StatsMode)) {
+            if (CollectProfileStats(Request.StatsMode)) {
                 for (auto&& i : nodeShards) {
                     Stats->AddNodeShardsCount(stageInfo.Id.StageId, i.first, i.second.size());
                 }
@@ -1720,7 +1724,7 @@ protected:
             ExecuterSpan.EndError(TStringBuilder() << NYql::NDqProto::StatusIds_StatusCode_Name(status));
         }
 
-        FillResponseStats(Ydb::StatusIds::TIMEOUT);
+        ResponseEv->Record.MutableResponse()->SetStatus(Ydb::StatusIds::TIMEOUT);
 
         // TEvAbortExecution can come from either ComputeActor or SessionActor (== Target).
         if (abortSender != Target) {
@@ -1730,38 +1734,9 @@ protected:
 
         LOG_E("Sending timeout response to: " << Target);
 
-        Request.Transactions.crop(0);
         this->Shutdown();
     }
 
-    void FillResponseStats(Ydb::StatusIds::StatusCode status) {
-        auto& response = *ResponseEv->Record.MutableResponse();
-
-        response.SetStatus(status);
-
-        if (Stats) {
-            ReportEventElapsedTime();
-
-            Stats->FinishTs = TInstant::Now();
-            Stats->Finish();
-
-            if (Stats->CollectStatsByLongTasks || CollectFullStats(Request.StatsMode)) {
-                for (ui32 txId = 0; txId < Request.Transactions.size(); ++txId) {
-                    const auto& tx = Request.Transactions[txId].Body;
-                    auto planWithStats = AddExecStatsToTxPlan(tx->GetPlan(), response.GetResult().GetStats());
-                    response.MutableResult()->MutableStats()->AddTxPlansWithStats(planWithStats);
-                }
-            }
-
-            if (Stats->CollectStatsByLongTasks) {
-                const auto& txPlansWithStats = response.GetResult().GetStats().GetTxPlansWithStats();
-                if (!txPlansWithStats.empty()) {
-                    LOG_N("Full stats: " << txPlansWithStats);
-                }
-            }
-        }
-    }
-
     virtual void ReplyErrorAndDie(Ydb::StatusIds::StatusCode status,
         google::protobuf::RepeatedPtrField<Ydb::Issue::IssueMessage>* issues)
     {
@@ -1775,8 +1750,7 @@ protected:
         AlreadyReplied = true;
         auto& response = *ResponseEv->Record.MutableResponse();
 
-        FillResponseStats(status);
-
+        response.SetStatus(status);
         response.MutableIssues()->Swap(issues);
 
         LOG_T("ReplyErrorAndDie. Response: " << response.DebugString()
@@ -1795,7 +1769,6 @@ protected:
         ExecuterSpan.EndError(response.DebugString());
         ExecuterStateSpan.EndError(response.DebugString());
 
-        Request.Transactions.crop(0);
         this->Shutdown();
     }
 
@@ -1873,8 +1846,35 @@ protected:
     void PassAway() override {
         YQL_ENSURE(AlreadyReplied && ResponseEv);
 
-        // Actualize stats with the last stats from terminated CAs, but keep the status.
-        FillResponseStats(ResponseEv->Record.GetResponse().GetStatus());
+        // Fill response stats
+        {
+            auto& response = *ResponseEv->Record.MutableResponse();
+
+            YQL_ENSURE(Stats);
+
+            ReportEventElapsedTime();
+
+            Stats->FinishTs = TInstant::Now();
+            Stats->Finish();
+
+            if (Stats->CollectStatsByLongTasks || CollectFullStats(Request.StatsMode)) {
+                response.MutableResult()->MutableStats()->ClearTxPlansWithStats();
+                for (ui32 txId = 0; txId < Request.Transactions.size(); ++txId) {
+                    const auto& tx = Request.Transactions[txId].Body;
+                    auto planWithStats = AddExecStatsToTxPlan(tx->GetPlan(), response.GetResult().GetStats());
+                    response.MutableResult()->MutableStats()->AddTxPlansWithStats(planWithStats);
+                }
+            }
+
+            if (Stats->CollectStatsByLongTasks) {
+                const auto& txPlansWithStats = response.GetResult().GetStats().GetTxPlansWithStats();
+                if (!txPlansWithStats.empty()) {
+                    LOG_N("Full stats: " << response.GetResult().GetStats());
+                }
+            }
+        }
+
+        Request.Transactions.crop(0);
         this->Send(Target, ResponseEv.release());
 
         for (auto channelPair: ResultChannelProxies) {

+ 5 - 1
ydb/core/kqp/executer_actor/kqp_executer_stats.cpp

@@ -701,6 +701,10 @@ void TQueryExecutionStats::AddComputeActorStats(ui32 /* nodeId */, NYql::NDqProt
         }
 
         // checking whether the task is long
+
+        // TODO(ilezhankin): investigate - for some reason `task.FinishTimeMs` may be large (or small?)
+        //      enough to result in an enormous duration - triggering the "long tasks" mode.
+
         auto taskDuration = TDuration::MilliSeconds(task.GetFinishTimeMs() - task.GetStartTimeMs());
         bool longTask = taskDuration > collectLongTaskStatsTimeout;
         if (longTask) {
@@ -1126,7 +1130,7 @@ void TQueryExecutionStats::Finish() {
     Result->SetCpuTimeUs(Result->GetCpuTimeUs() + ExecuterCpuTime.MicroSeconds());
     Result->SetDurationUs(FinishTs.MicroSeconds() - StartTs.MicroSeconds());
 
-    // Result->Result* feilds are (temporary?) commented out in proto due to lack of use
+    // Result->Result* fields are (temporary?) commented out in proto due to lack of use
     //
     // Result->SetResultBytes(ResultBytes);
     // Result->SetResultRows(ResultRows);

+ 1 - 1
ydb/core/kqp/executer_actor/kqp_scan_executer.cpp

@@ -276,7 +276,7 @@ public:
         YQL_ENSURE(!AlreadyReplied);
         AlreadyReplied = true;
 
-        FillResponseStats(Ydb::StatusIds::SUCCESS);
+        ResponseEv->Record.MutableResponse()->SetStatus(Ydb::StatusIds::SUCCESS);
 
         LWTRACK(KqpScanExecuterFinalize, ResponseEv->Orbit, TxId, LastTaskId, LastComputeActorId, ResponseEv->ResultsSize());
 

+ 3 - 3
ydb/core/kqp/opt/kqp_query_plan.cpp

@@ -2101,9 +2101,9 @@ NJson::TJsonValue ReconstructQueryPlanRec(const NJson::TJsonValue& plan,
 
         newPlans.AppendValue(ReconstructQueryPlanRec(
             plan.GetMapSafe().at("Plans").GetArraySafe()[0],
-            0, 
-            planIndex, 
-            precomputes, 
+            0,
+            planIndex,
+            precomputes,
             nodeCounter));
 
         newPlans.AppendValue(lookupPlan);

+ 3 - 3
ydb/core/kqp/rm_service/kqp_rm_service.h

@@ -139,9 +139,9 @@ public:
     }
 
     TString ToString() const {
-        auto res = TStringBuilder() << "TxResourcesInfo{ "
+        auto res = TStringBuilder() << "TxResourcesInfo { "
             << "TxId: " << TxId
-            << "Database: " << Database;
+            << ", Database: " << Database;
 
         if (!PoolId.empty()) {
             res << ", PoolId: " << PoolId
@@ -149,7 +149,7 @@ public:
         }
 
         res << ", memory initially granted resources: " << TxExternalDataQueryMemory.load()
-            << ", extra allocations " << TxScanQueryMemory.load()
+            << ", tx total allocations " << TxScanQueryMemory.load()
             << ", execution units: " << TxExecutionUnits.load()
             << ", started at: " << CreatedAt
             << " }";

+ 4 - 2
ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h

@@ -313,12 +313,12 @@ protected:
     void OnMemoryLimitExceptionHandler() {
         TString memoryConsumptionDetails = MemoryLimits.MemoryQuotaManager->MemoryConsumptionDetails();
         TStringBuilder failureReason = TStringBuilder()
-            << "Mkql memory limit exceeded, limit: " << GetMkqlMemoryLimit()
+            << "Mkql memory limit exceeded, allocated by task " << Task.GetId() << ": " << GetMkqlMemoryLimit()
             << ", host: " << HostName()
             << ", canAllocateExtraMemory: " << CanAllocateExtraMemory;
 
         if (!memoryConsumptionDetails.empty()) {
-            failureReason << ", memory manager details: " << memoryConsumptionDetails;
+            failureReason << ", memory manager details for current node: " << memoryConsumptionDetails;
         }
 
         InternalError(NYql::NDqProto::StatusIds::OVERLOADED, TIssuesIds::KIKIMR_PRECONDITION_FAILED, failureReason);
@@ -1825,6 +1825,8 @@ public:
                     }
                 }
             }
+        } else {
+            // TODO: what should happen in this case?
         }
 
         static_cast<TDerived*>(this)->FillExtraStats(dst, last);

+ 1 - 1
ydb/library/yql/dq/actors/protos/dq_stats.proto

@@ -377,7 +377,7 @@ message TDqExecutionStats {
     uint64 CpuTimeUs = 1;  // total cpu time, executer + compute actors + ...
     uint64 DurationUs = 2; // execution wall time
 
-    // these feilds are never used, needs to be reviewed
+    // these fields are never used, needs to be reviewed
     reserved 3; // uint64 ResultRows = 3;
     reserved 4; // uint64 ResultBytes = 4;