Browse Source

Allow to forget consumption peaks (#7763)

Mikhail Surin 7 months ago
parent
commit
c70d3ebbec

+ 8 - 3
ydb/core/kqp/node_service/kqp_node_service.cpp

@@ -85,6 +85,8 @@ public:
             SetPriorities(config.GetPoolsConfiguration());
         }
         Scheduler.ReportCounters(counters);
+        AdvanceTimeInterval = TDuration::MicroSeconds(config.GetComputeSchedulerSettings().GetAdvanceTimeIntervalUsec());
+        Scheduler.SetForgetInterval(TDuration::MicroSeconds(config.GetComputeSchedulerSettings().GetForgetOverflowTimeoutUsec()));
     }
 
     void Bootstrap() {
@@ -104,7 +106,7 @@ public:
         }
 
         Schedule(TDuration::Seconds(1), new TEvents::TEvWakeup(WakeCleaunupTag));
-        Schedule(TDuration::MilliSeconds(50), new TEvents::TEvWakeup(WakeAdvanceTimeTag));
+        Schedule(AdvanceTimeInterval, new TEvents::TEvWakeup(WakeAdvanceTimeTag));
         Become(&TKqpNodeService::WorkState);
     }
 
@@ -343,7 +345,7 @@ private:
     void HandleWork(TEvents::TEvWakeup::TPtr& ev) {
         if (ev->Get()->Tag == WakeAdvanceTimeTag) {
             Scheduler.AdvanceTime(TMonotonic::Now());
-            Schedule(TDuration::MilliSeconds(50), new TEvents::TEvWakeup(WakeAdvanceTimeTag));
+            Schedule(AdvanceTimeInterval, new TEvents::TEvWakeup(WakeAdvanceTimeTag));
         }
         if (ev->Get()->Tag == WakeCleaunupTag) {
             Schedule(TDuration::Seconds(1), ev->Release().Release());
@@ -395,9 +397,11 @@ private:
             SetPriorities(event.GetConfig().GetTableServiceConfig().GetPoolsConfiguration());
         }
 
+        AdvanceTimeInterval = TDuration::MicroSeconds(event.GetConfig().GetTableServiceConfig().GetComputeSchedulerSettings().GetAdvanceTimeIntervalUsec());
+        Scheduler.SetForgetInterval(TDuration::MicroSeconds(event.GetConfig().GetTableServiceConfig().GetComputeSchedulerSettings().GetForgetOverflowTimeoutUsec()));
+
         auto responseEv = MakeHolder<NConsole::TEvConsole::TEvConfigNotificationResponse>(event);
         Send(ev->Sender, responseEv.Release(), IEventHandle::FlagTrackDelivery, ev->Cookie);
-
     }
 
     void SetIteratorReadsQuotaSettings(const NKikimrConfig::TTableServiceConfig::TIteratorReadQuotaSettings& settings) {
@@ -520,6 +524,7 @@ private:
     const std::optional<TKqpFederatedQuerySetup> FederatedQuerySetup;
 
     TComputeScheduler Scheduler;
+    TDuration AdvanceTimeInterval;
 
     //state sharded by TxId
     std::shared_ptr<TNodeServiceState> State_;

+ 12 - 2
ydb/core/kqp/runtime/kqp_compute_scheduler.cpp

@@ -134,6 +134,8 @@ public:
     static constexpr double BatchCalcDecay = 0;
     TDuration BatchTime = AvgBatch;
 
+    TDuration OverflowToleranceTimeout = TDuration::Seconds(1);
+
     static constexpr TDuration ActivationPenalty = TDuration::MicroSeconds(10);
 
     size_t Wakeups = 0;
@@ -212,6 +214,7 @@ struct TComputeScheduler::TImpl {
 
     TIntrusivePtr<TKqpCounters> Counters;
     TDuration SmoothPeriod = TDuration::MilliSeconds(100);
+    TDuration ForgetInteval = TDuration::Seconds(2);
 
     TDuration MaxDelay = TDuration::Seconds(10);
 
@@ -361,10 +364,13 @@ void TComputeScheduler::AdvanceTime(TMonotonic now) {
             }
             double delta = 0;
 
-            v.Next()->TrackedBefore = Impl->Records[i]->TrackedMicroSeconds.load();
+            auto tracked = Impl->Records[i]->TrackedMicroSeconds.load();
             v.Next()->MaxLimitDeviation = Impl->SmoothPeriod.MicroSeconds() * v.Next()->Weight;
             v.Next()->LastNowRecalc = now;
-            v.Next()->TrackedBefore = Min<ssize_t>(group.get()->Limit(now) - group.get()->MaxLimitDeviation, v.Next()->TrackedBefore);
+            v.Next()->TrackedBefore = 
+                Max<ssize_t>(
+                    tracked - FromDuration(Impl->ForgetInteval) * group.get()->Weight, 
+                    Min<ssize_t>(group.get()->Limit(now) - group.get()->MaxLimitDeviation, tracked));
 
             if (!group.get()->Disabled && group.get()->EntitiesWeight > MinEntitiesWeight) {
                 delta = FromDuration(now - group.get()->LastNowRecalc) * group.get()->Weight / group.get()->EntitiesWeight;
@@ -425,6 +431,10 @@ void TComputeScheduler::SetMaxDeviation(TDuration period) {
     Impl->SmoothPeriod = period;
 }
 
+void TComputeScheduler::SetForgetInterval(TDuration period) {
+    Impl->ForgetInteval = period;
+}
+
 bool TComputeScheduler::Disabled(TString group) {
     auto ptr = Impl->PoolId.FindPtr(group);
     return !ptr || Impl->Records[*ptr]->MutableStats.Current().get()->Disabled;

+ 1 - 0
ydb/core/kqp/runtime/kqp_compute_scheduler.h

@@ -74,6 +74,7 @@ public:
 
     void SetPriorities(TDistributionRule rootRule, double cores, TMonotonic now);
     void SetMaxDeviation(TDuration);
+    void SetForgetInterval(TDuration);
     ::NMonitoring::TDynamicCounters::TCounterPtr GetGroupUsageCounter(TString group) const;
 
     TSchedulerEntityHandle Enroll(TString group, double weight, TMonotonic now);

+ 9 - 2
ydb/core/protos/table_service_config.proto

@@ -305,7 +305,7 @@ message TTableServiceConfig {
 
     message TSubPoolsConfiguration {
         repeated TComputePoolConfiguration SubPools = 1;
-    };
+    }
 
     message TComputePoolConfiguration {
         optional double MaxCpuShare = 1;
@@ -313,8 +313,15 @@ message TTableServiceConfig {
             string Name = 2;
             TSubPoolsConfiguration SubPoolsConfiguration = 3;
         }
-    };
+    }
+
+    message TComputeSchedulerSettings {
+        optional uint64 AdvanceTimeIntervalUsec = 1 [default = 50000];
+        optional uint64 ForgetOverflowTimeoutUsec = 2 [default = 2000000];
+    }
+
     optional TComputePoolConfiguration PoolsConfiguration = 68;
+    optional TComputeSchedulerSettings ComputeSchedulerSettings = 70;
 
     optional bool EnableRowsDuplicationCheck = 69 [ default = false ];
 };