Browse Source

Set KQP & Resource Broker limits from MemoryController (#7590)

kungurtsev 7 months ago
parent
commit
77e599f7dc

+ 81 - 124
ydb/core/memory_controller/memory_controller.cpp

@@ -1,4 +1,5 @@
 #include "memory_controller.h"
+#include "memory_controller_config.h"
 #include "memtable_collection.h"
 #include <ydb/core/base/counters.h>
 #include <ydb/core/base/memory_controller_iface.h>
@@ -8,6 +9,7 @@
 #include <ydb/core/node_whiteboard/node_whiteboard.h>
 #include <ydb/core/protos/memory_controller_config.pb.h>
 #include <ydb/core/protos/memory_stats.pb.h>
+#include <ydb/core/tablet/resource_broker.h>
 #include <ydb/core/tablet_flat/shared_sausagecache.h>
 #include <ydb/library/actors/core/actor_bootstrapped.h>
 #include <ydb/library/actors/core/log.h>
@@ -27,8 +29,23 @@ ui64 SafeDiff(ui64 a, ui64 b) {
 namespace {
 
 using namespace NActors;
+using namespace NResourceBroker;
 using TCounterPtr = ::NMonitoring::TDynamicCounters::TCounterPtr;
 
+struct TResourceBrokerLimits {
+    ui64 LimitBytes;
+    ui64 QueryExecutionLimitBytes;
+
+    auto operator<=>(const TResourceBrokerLimits&) const = default;
+
+    TString ToString() const noexcept {
+        TStringBuilder result;
+        result << "LimitBytes: " << LimitBytes;
+        result << " QueryExecutionLimitBytes: " << QueryExecutionLimitBytes;
+        return result;
+    }
+};
+
 class TMemoryConsumer : public IMemoryConsumer {
 public:
     TMemoryConsumer(EMemoryConsumerKind kind, TActorId actorId)
@@ -52,27 +69,18 @@ private:
     std::atomic<ui64> Consumption = 0;
 };
 
-struct TConsumerConfig {
-    std::optional<float> MinPercent;
-    std::optional<ui64> MinBytes;
-    std::optional<float> MaxPercent;
-    std::optional<ui64> MaxBytes;
-    bool CanZeroLimit = false;
-};
-
 struct TConsumerState {
     const EMemoryConsumerKind Kind;
     const TActorId ActorId;
     const ui64 Consumption;
-    const TConsumerConfig Config;
     ui64 MinBytes = 0;
     ui64 MaxBytes = 0;
+    bool CanZeroLimit = false;
 
-    TConsumerState(const TMemoryConsumer& consumer, TConsumerConfig config)
+    TConsumerState(const TMemoryConsumer& consumer)
         : Kind(consumer.Kind)
         , ActorId(consumer.ActorId)
         , Consumption(consumer.GetConsumption())
-        , Config(config)
     {
     }
 
@@ -132,6 +140,8 @@ private:
             HFunc(TEvMemTableRegister, Handle);
             HFunc(TEvMemTableUnregister, Handle);
             HFunc(TEvMemTableCompacted, Handle);
+
+            HFunc(TEvResourceBroker::TEvConfigureResult, Handle);
         }
     }
 
@@ -144,9 +154,10 @@ private:
         auto processMemoryInfo = ProcessMemoryInfoProvider->Get();
 
         bool hasMemTotalHardLimit = false;
-        ui64 hardLimitBytes = GetHardLimitBytes(processMemoryInfo, hasMemTotalHardLimit);
-        ui64 softLimitBytes = GetSoftLimitBytes(hardLimitBytes);
-        ui64 targetUtilizationBytes = GetTargetUtilizationBytes(hardLimitBytes);
+        ui64 hardLimitBytes = GetHardLimitBytes(Config, processMemoryInfo, hasMemTotalHardLimit);
+        ui64 softLimitBytes = GetSoftLimitBytes(Config, hardLimitBytes);
+        ui64 targetUtilizationBytes = GetTargetUtilizationBytes(Config, hardLimitBytes);
+        ui64 activitiesLimitBytes = GetActivitiesLimitBytes(Config, hardLimitBytes);
 
         TVector<TConsumerState> consumers(::Reserve(Consumers.size()));
         ui64 consumersConsumption = 0;
@@ -189,6 +200,7 @@ private:
             << " MemTotal: " << processMemoryInfo.MemTotal << " MemAvailable: " << processMemoryInfo.MemAvailable
             << " AllocatedMemory: " << processMemoryInfo.AllocatedMemory << " AllocatorCachesMemory: " << processMemoryInfo.AllocatorCachesMemory
             << " HardLimit: " << hardLimitBytes << " SoftLimit: " << softLimitBytes << " TargetUtilization: " << targetUtilizationBytes
+            << " ActivitiesLimitBytes: " << activitiesLimitBytes
             << " ConsumersConsumption: " << consumersConsumption << " OtherConsumption: " << otherConsumption << " ExternalConsumption: " << externalConsumption
             << " TargetConsumersConsumption: " << targetConsumersConsumption << " ResultingConsumersConsumption: " << resultingConsumersConsumption
             << " Coefficient: " << coefficient);
@@ -202,6 +214,7 @@ private:
         Counters->GetCounter("Stats/HardLimit")->Set(hardLimitBytes);
         Counters->GetCounter("Stats/SoftLimit")->Set(softLimitBytes);
         Counters->GetCounter("Stats/TargetUtilization")->Set(targetUtilizationBytes);
+        Counters->GetCounter("Stats/ActivitiesLimitBytes")->Set(activitiesLimitBytes);
         Counters->GetCounter("Stats/ConsumersConsumption")->Set(consumersConsumption);
         Counters->GetCounter("Stats/OtherConsumption")->Set(otherConsumption);
         Counters->GetCounter("Stats/ExternalConsumption")->Set(externalConsumption);
@@ -227,7 +240,7 @@ private:
         ui64 consumersLimitBytes = 0;
         for (const auto& consumer : consumers) {
             ui64 limitBytes = consumer.GetLimit(coefficient);
-            if (resultingConsumersConsumption + otherConsumption + externalConsumption > softLimitBytes && consumer.Config.CanZeroLimit) {
+            if (resultingConsumersConsumption + otherConsumption + externalConsumption > softLimitBytes && consumer.CanZeroLimit) {
                 limitBytes = SafeDiff(limitBytes, resultingConsumersConsumption + otherConsumption + externalConsumption - softLimitBytes);
             }
             consumersLimitBytes += limitBytes;
@@ -249,6 +262,9 @@ private:
         Counters->GetCounter("Stats/ConsumersLimit")->Set(consumersLimitBytes);
         memoryStats.SetConsumersLimit(consumersLimitBytes);
 
+        // Note: for now ResourceBroker and its queues aren't MemoryController consumers and don't share limits with other caches
+        ApplyResourceBrokerLimits(hardLimitBytes, activitiesLimitBytes);
+
         Send(NNodeWhiteboard::MakeNodeWhiteboardServiceId(SelfId().NodeId()), memoryStatsUpdate);
 
         ctx.Schedule(Interval, new TEvents::TEvWakeup());
@@ -283,6 +299,14 @@ private:
         }
     }
 
+    void Handle(TEvResourceBroker::TEvConfigureResult::TPtr &ev, const TActorContext& ctx) {
+        const auto *msg = ev->Get();
+        LOG_LOG_S(ctx, 
+            msg->Record.GetSuccess() ? NActors::NLog::PRI_INFO : NActors::NLog::PRI_ERROR, 
+            NKikimrServices::MEMORY_CONTROLLER, 
+            "ResourceBroker configure result " << msg->Record.ShortDebugString());
+    }
+
     double BinarySearchCoefficient(const TVector<TConsumerState>& consumers, ui64 availableMemory) {
         static const ui32 BinarySearchIterations = 20;
 
@@ -326,6 +350,36 @@ private:
         }
     }
 
+    void ApplyResourceBrokerLimits(ui64 hardLimitBytes, ui64 activitiesLimitBytes) {
+        ui64 queryExecutionLimitBytes = GetQueryExecutionLimitBytes(Config, hardLimitBytes);
+
+        TResourceBrokerLimits newLimits{
+            activitiesLimitBytes,
+            queryExecutionLimitBytes
+        };
+        
+        if (newLimits == CurrentResourceBrokerLimits) {
+            return;
+        }
+
+        CurrentResourceBrokerLimits = newLimits;
+
+        LOG_INFO_S(TlsActivationContext->AsActorContext(), NKikimrServices::MEMORY_CONTROLLER, "Consumer QueryExecution state:"
+            << " Limit: " << newLimits.QueryExecutionLimitBytes);
+
+        Counters->GetCounter("Consumer/QueryExecution/Limit")->Set(newLimits.QueryExecutionLimitBytes);
+
+        TAutoPtr<TEvResourceBroker::TEvConfigure> configure = new TEvResourceBroker::TEvConfigure();
+        configure->Merge = true;
+        configure->Record.MutableResourceLimit()->SetMemory(activitiesLimitBytes);
+
+        auto queue = configure->Record.AddQueues();
+        queue->SetName(NLocalDb::KqpResourceManagerQueue);
+        queue->MutableLimit()->SetMemory(queryExecutionLimitBytes);
+
+        Send(MakeResourceBrokerID(), configure.Release());
+    }
+
     TConsumerCounters& GetConsumerCounters(EMemoryConsumerKind consumer) {
         auto it = ConsumerCounters.FindPtr(consumer);
         if (it) {
@@ -358,78 +412,18 @@ private:
         }
     }
 
-    TConsumerState BuildConsumerState(const TMemoryConsumer& consumer, ui64 availableMemory) const {
-        auto config = GetConsumerConfig(consumer.Kind);
+    TConsumerState BuildConsumerState(const TMemoryConsumer& consumer, ui64 hardLimitBytes) const {
+        TConsumerState result(consumer);
         
-        std::optional<ui64> minBytes;
-        std::optional<ui64> maxBytes;
-
-        if (config.MinPercent.has_value() && config.MinBytes.has_value()) {
-            minBytes = Max(GetPercent(config.MinPercent.value(), availableMemory), config.MinBytes.value());
-        } else if (config.MinPercent.has_value()) {
-            minBytes = GetPercent(config.MinPercent.value(), availableMemory);
-        } else if (config.MinBytes.has_value()) {
-            minBytes = config.MinBytes.value();
-        }
-
-        if (config.MaxPercent.has_value() && config.MaxBytes.has_value()) {
-            maxBytes = Min(GetPercent(config.MaxPercent.value(), availableMemory), config.MaxBytes.value());
-        } else if (config.MaxPercent.has_value()) {
-            maxBytes = GetPercent(config.MaxPercent.value(), availableMemory);
-        } else if (config.MaxBytes.has_value()) {
-            maxBytes = config.MaxBytes.value();
-        }
-
-        if (minBytes.has_value() && !maxBytes.has_value()) {
-            maxBytes = minBytes;
-        }
-        if (!minBytes.has_value() && maxBytes.has_value()) {
-            minBytes = maxBytes;
-        }
-
-        TConsumerState result(std::move(consumer), config);
-
-        result.MinBytes = minBytes.value_or(0);
-        result.MaxBytes = maxBytes.value_or(0);
-        if (result.MinBytes > result.MaxBytes) {
-            result.MinBytes = result.MaxBytes;
-        }
-
-        return result;
-    }
-
-    TConsumerConfig GetConsumerConfig(EMemoryConsumerKind consumer) const {
-        TConsumerConfig result;
-
-        switch (consumer) {
+        switch (consumer.Kind) {
             case EMemoryConsumerKind::MemTable: {
-                if (Config.HasMemTableMinPercent() || Config.GetMemTableMinPercent()) {
-                    result.MinPercent = Config.GetMemTableMinPercent();
-                }
-                if (Config.HasMemTableMinBytes() || Config.GetMemTableMinBytes()) {
-                    result.MinBytes = Config.GetMemTableMinBytes();
-                }
-                if (Config.HasMemTableMaxPercent() || Config.GetMemTableMaxPercent()) {
-                    result.MaxPercent = Config.GetMemTableMaxPercent();
-                }
-                if (Config.HasMemTableMaxBytes() || Config.GetMemTableMaxBytes()) {
-                    result.MaxBytes = Config.GetMemTableMaxBytes();
-                }
+                result.MinBytes = GetMemTableMinBytes(Config, hardLimitBytes);
+                result.MaxBytes = GetMemTableMaxBytes(Config, hardLimitBytes);
                 break;
             }
             case EMemoryConsumerKind::SharedCache: {
-                if (Config.HasSharedCacheMinPercent() || Config.GetSharedCacheMinPercent()) {
-                    result.MinPercent = Config.GetSharedCacheMinPercent();
-                }
-                if (Config.HasSharedCacheMinBytes() || Config.GetSharedCacheMinBytes()) {
-                    result.MinBytes = Config.GetSharedCacheMinBytes();
-                }
-                if (Config.HasSharedCacheMaxPercent() || Config.GetSharedCacheMaxPercent()) {
-                    result.MaxPercent = Config.GetSharedCacheMaxPercent();
-                }
-                if (Config.HasSharedCacheMaxBytes() || Config.GetSharedCacheMaxBytes()) {
-                    result.MaxBytes = Config.GetSharedCacheMaxBytes();
-                }
+                result.MinBytes = GetSharedCacheMinBytes(Config, hardLimitBytes);
+                result.MaxBytes = GetSharedCacheMaxBytes(Config, hardLimitBytes);
                 result.CanZeroLimit = true;
                 break;
             }
@@ -437,49 +431,11 @@ private:
                 Y_ABORT("Unhandled consumer");
         }
 
-        return result;
-    }
-
-    ui64 GetHardLimitBytes(const TProcessMemoryInfo& info, bool& hasMemTotalHardLimit) const {
-        if (Config.HasHardLimitBytes()) {
-            ui64 hardLimitBytes = Config.GetHardLimitBytes();
-            if (info.CGroupLimit.has_value()) {
-                hardLimitBytes = Min(hardLimitBytes, info.CGroupLimit.value());
-            }
-            return hardLimitBytes;
-        }
-        if (info.CGroupLimit.has_value()) {
-            return info.CGroupLimit.value();
-        }
-        if (info.MemTotal) {
-            hasMemTotalHardLimit = true;
-            return info.MemTotal.value();
-        }
-        return 512_MB; // fallback
-    }
-
-    ui64 GetSoftLimitBytes(ui64 hardLimitBytes) const {
-        if (Config.HasSoftLimitPercent() && Config.HasSoftLimitBytes()) {
-            return Min(GetPercent(Config.GetSoftLimitPercent(), hardLimitBytes), Config.GetSoftLimitBytes());
-        }
-        if (Config.HasSoftLimitBytes()) {
-            return Config.GetSoftLimitBytes();
-        }
-        return GetPercent(Config.GetSoftLimitPercent(), hardLimitBytes);
-    }
-
-    ui64 GetTargetUtilizationBytes(ui64 hardLimitBytes) const {
-        if (Config.HasTargetUtilizationPercent() && Config.HasTargetUtilizationBytes()) {
-            return Min(GetPercent(Config.GetTargetUtilizationPercent(), hardLimitBytes), Config.GetTargetUtilizationBytes());
-        }
-        if (Config.HasTargetUtilizationBytes()) {
-            return Config.GetTargetUtilizationBytes();
+        if (result.MinBytes > result.MaxBytes) {
+            result.MinBytes = result.MaxBytes;
         }
-        return GetPercent(Config.GetTargetUtilizationPercent(), hardLimitBytes);
-    }
 
-    ui64 GetPercent(float percent, ui64 value) const {
-        return static_cast<ui64>(static_cast<double>(value) * (percent / 100.0));
+        return result;
     }
 
 private:
@@ -490,6 +446,7 @@ private:
     NKikimrConfig::TMemoryControllerConfig Config;
     const TIntrusivePtr<::NMonitoring::TDynamicCounters> Counters;
     TMap<EMemoryConsumerKind, TConsumerCounters> ConsumerCounters;
+    std::optional<TResourceBrokerLimits> CurrentResourceBrokerLimits;
 };
 
 }
@@ -498,7 +455,7 @@ IActor* CreateMemoryController(
         TDuration interval,
         TIntrusiveConstPtr<IProcessMemoryInfoProvider> processMemoryInfoProvider,
         const NKikimrConfig::TMemoryControllerConfig& config, 
-        TIntrusivePtr<::NMonitoring::TDynamicCounters> counters) {
+        const TIntrusivePtr<::NMonitoring::TDynamicCounters>& counters) {
     return new TMemoryController(
         interval,
         std::move(processMemoryInfoProvider),

+ 1 - 1
ydb/core/memory_controller/memory_controller.h

@@ -14,6 +14,6 @@ NActors::IActor* CreateMemoryController(
     TDuration interval,
     TIntrusiveConstPtr<IProcessMemoryInfoProvider> processMemoryInfoProvider,
     const NKikimrConfig::TMemoryControllerConfig& config, 
-    TIntrusivePtr<::NMonitoring::TDynamicCounters> counters);
+    const TIntrusivePtr<::NMonitoring::TDynamicCounters>& counters);
 
 }

+ 115 - 0
ydb/core/memory_controller/memory_controller_config.h

@@ -0,0 +1,115 @@
+#include <ydb/core/mon_alloc/memory_info.h>
+#include <ydb/core/protos/memory_controller_config.pb.h>
+#include <util/generic/size_literals.h>
+
+#pragma once
+
+namespace NKikimr::NMemory {
+
+namespace {
+    
+ui64 GetPercent(float percent, ui64 value) {
+    return static_cast<ui64>(static_cast<double>(value) * (percent / 100.0));
+}
+
+};
+
+inline ui64 GetHardLimitBytes(const NKikimrConfig::TMemoryControllerConfig& config, const TProcessMemoryInfo& info, bool& hasMemTotalHardLimit) {
+    if (config.HasHardLimitBytes()) {
+        ui64 hardLimitBytes = config.GetHardLimitBytes();
+        if (info.CGroupLimit.has_value()) {
+            hardLimitBytes = Min(hardLimitBytes, info.CGroupLimit.value());
+        }
+        return hardLimitBytes;
+    }
+    if (info.CGroupLimit.has_value()) {
+        return info.CGroupLimit.value();
+    }
+    if (info.MemTotal) {
+        hasMemTotalHardLimit = true;
+        return info.MemTotal.value();
+    }
+    return 512_MB; // fallback
+}
+
+inline ui64 GetSoftLimitBytes(const NKikimrConfig::TMemoryControllerConfig& config, ui64 hardLimitBytes) {
+    if (config.HasSoftLimitPercent() && config.HasSoftLimitBytes()) {
+        return Min(GetPercent(config.GetSoftLimitPercent(), hardLimitBytes), config.GetSoftLimitBytes());
+    }
+    if (config.HasSoftLimitBytes()) {
+        return config.GetSoftLimitBytes();
+    }
+    return GetPercent(config.GetSoftLimitPercent(), hardLimitBytes);
+}
+
+inline ui64 GetTargetUtilizationBytes(const NKikimrConfig::TMemoryControllerConfig& config, ui64 hardLimitBytes) {
+    if (config.HasTargetUtilizationPercent() && config.HasTargetUtilizationBytes()) {
+        return Min(GetPercent(config.GetTargetUtilizationPercent(), hardLimitBytes), config.GetTargetUtilizationBytes());
+    }
+    if (config.HasTargetUtilizationBytes()) {
+        return config.GetTargetUtilizationBytes();
+    }
+    return GetPercent(config.GetTargetUtilizationPercent(), hardLimitBytes);
+}
+
+inline ui64 GetActivitiesLimitBytes(const NKikimrConfig::TMemoryControllerConfig& config, ui64 hardLimitBytes) {
+    if (config.HasActivitiesLimitPercent() && config.HasActivitiesLimitBytes()) {
+        return Min(GetPercent(config.GetActivitiesLimitPercent(), hardLimitBytes), config.GetActivitiesLimitBytes());
+    }
+    if (config.HasActivitiesLimitBytes()) {
+        return config.GetActivitiesLimitBytes();
+    }
+    return GetPercent(config.GetActivitiesLimitPercent(), hardLimitBytes);
+}
+
+inline ui64 GetMemTableMinBytes(const NKikimrConfig::TMemoryControllerConfig& config, ui64 hardLimitBytes) {
+    if (config.HasMemTableMinPercent() && config.HasMemTableMinBytes()) {
+        return Max(GetPercent(config.GetMemTableMinPercent(), hardLimitBytes), config.GetMemTableMinBytes());
+    }
+    if (config.HasMemTableMinBytes()) {
+        return config.GetMemTableMinBytes();
+    }
+    return GetPercent(config.GetMemTableMinPercent(), hardLimitBytes);
+}
+
+inline ui64 GetMemTableMaxBytes(const NKikimrConfig::TMemoryControllerConfig& config, ui64 hardLimitBytes) {
+    if (config.HasMemTableMaxPercent() && config.HasMemTableMaxBytes()) {
+        return Min(GetPercent(config.GetMemTableMaxPercent(), hardLimitBytes), config.GetMemTableMaxBytes());
+    }
+    if (config.HasMemTableMaxBytes()) {
+        return config.GetMemTableMaxBytes();
+    }
+    return GetPercent(config.GetMemTableMaxPercent(), hardLimitBytes);
+}
+
+inline ui64 GetSharedCacheMinBytes(const NKikimrConfig::TMemoryControllerConfig& config, ui64 hardLimitBytes) {
+    if (config.HasSharedCacheMinPercent() && config.HasSharedCacheMinBytes()) {
+        return Max(GetPercent(config.GetSharedCacheMinPercent(), hardLimitBytes), config.GetSharedCacheMinBytes());
+    }
+    if (config.HasSharedCacheMinBytes()) {
+        return config.GetSharedCacheMinBytes();
+    }
+    return GetPercent(config.GetSharedCacheMinPercent(), hardLimitBytes);
+}
+
+inline ui64 GetSharedCacheMaxBytes(const NKikimrConfig::TMemoryControllerConfig& config, ui64 hardLimitBytes) {
+    if (config.HasSharedCacheMaxPercent() && config.HasSharedCacheMaxBytes()) {
+        return Min(GetPercent(config.GetSharedCacheMaxPercent(), hardLimitBytes), config.GetSharedCacheMaxBytes());
+    }
+    if (config.HasSharedCacheMaxBytes()) {
+        return config.GetSharedCacheMaxBytes();
+    }
+    return GetPercent(config.GetSharedCacheMaxPercent(), hardLimitBytes);
+}
+
+inline ui64 GetQueryExecutionLimitBytes(const NKikimrConfig::TMemoryControllerConfig& config, ui64 hardLimitBytes) {
+    if (config.HasQueryExecutionLimitPercent() && config.HasQueryExecutionLimitBytes()) {
+        return Min(GetPercent(config.GetQueryExecutionLimitPercent(), hardLimitBytes), config.GetQueryExecutionLimitBytes());
+    }
+    if (config.HasQueryExecutionLimitBytes()) {
+        return config.GetQueryExecutionLimitBytes();
+    }
+    return GetPercent(config.GetQueryExecutionLimitPercent(), hardLimitBytes);
+}
+
+}

+ 55 - 0
ydb/core/memory_controller/memory_controller_ut.cpp

@@ -1,5 +1,6 @@
 #include <library/cpp/testing/unittest/registar.h>
 #include <library/cpp/testing/unittest/tests_data.h>
+#include <ydb/core/tablet/resource_broker.h>
 #include <ydb/core/tablet_flat/shared_sausagecache.h>
 #include <ydb/core/tx/datashard/ut_common/datashard_ut_common.h>
 #include <ydb/library/actors/testlib/test_runtime.h>
@@ -65,6 +66,7 @@ private:
 
         Runtime->SetLogPriority(NKikimrServices::MEMORY_CONTROLLER, NLog::PRI_TRACE);
         Runtime->SetLogPriority(NKikimrServices::TABLET_SAUSAGECACHE, NLog::PRI_TRACE);
+        Runtime->SetLogPriority(NKikimrServices::RESOURCE_BROKER, NLog::PRI_TRACE);
     }
 
 private:
@@ -209,6 +211,9 @@ Y_UNIT_TEST(Config_ConsumerLimits) {
     memoryControllerConfig->SetMemTableMinBytes(10_MB);
     memoryControllerConfig->SetMemTableMaxBytes(50_MB);
 
+    memoryControllerConfig->SetQueryExecutionLimitPercent(15);
+    memoryControllerConfig->SetQueryExecutionLimitBytes(30_MB);
+
     auto server = MakeIntrusive<TWithMemoryControllerServer>(serverSettings);
     auto& runtime = *server->GetRuntime();
     
@@ -218,6 +223,7 @@ Y_UNIT_TEST(Config_ConsumerLimits) {
     UNIT_ASSERT_VALUES_EQUAL(server->MemoryControllerCounters->GetCounter("Consumer/SharedCache/LimitMax")->Val(), 300_MB);
     UNIT_ASSERT_VALUES_EQUAL(server->MemoryControllerCounters->GetCounter("Consumer/MemTable/LimitMin")->Val(), 50_MB);
     UNIT_ASSERT_VALUES_EQUAL(server->MemoryControllerCounters->GetCounter("Consumer/MemTable/LimitMax")->Val(), 50_MB);
+    UNIT_ASSERT_VALUES_EQUAL(server->MemoryControllerCounters->GetCounter("Consumer/QueryExecution/Limit")->Val(), 30_MB);
 
     server->ProcessMemoryInfo->CGroupLimit = 400_MB;
     runtime.SimulateSleep(TDuration::Seconds(2));
@@ -225,6 +231,7 @@ Y_UNIT_TEST(Config_ConsumerLimits) {
     UNIT_ASSERT_VALUES_EQUAL(server->MemoryControllerCounters->GetCounter("Consumer/SharedCache/LimitMax")->Val(), 120_MB);
     UNIT_ASSERT_VALUES_EQUAL(server->MemoryControllerCounters->GetCounter("Consumer/MemTable/LimitMin")->Val(), 40_MB);
     UNIT_ASSERT_VALUES_EQUAL(server->MemoryControllerCounters->GetCounter("Consumer/MemTable/LimitMax")->Val(), 50_MB);
+    UNIT_ASSERT_VALUES_EQUAL(server->MemoryControllerCounters->GetCounter("Consumer/QueryExecution/Limit")->Val(), 30_MB);
 
     server->ProcessMemoryInfo->CGroupLimit = 100_MB;
     runtime.SimulateSleep(TDuration::Seconds(2));
@@ -232,6 +239,7 @@ Y_UNIT_TEST(Config_ConsumerLimits) {
     UNIT_ASSERT_VALUES_EQUAL(server->MemoryControllerCounters->GetCounter("Consumer/SharedCache/LimitMax")->Val(), 30_MB);
     UNIT_ASSERT_VALUES_EQUAL(server->MemoryControllerCounters->GetCounter("Consumer/MemTable/LimitMin")->Val(), 10_MB);
     UNIT_ASSERT_VALUES_EQUAL(server->MemoryControllerCounters->GetCounter("Consumer/MemTable/LimitMax")->Val(), 20_MB);
+    UNIT_ASSERT_VALUES_EQUAL(server->MemoryControllerCounters->GetCounter("Consumer/QueryExecution/Limit")->Val(), 15_MB);
 }
 
 Y_UNIT_TEST(SharedCache) {
@@ -366,6 +374,53 @@ Y_UNIT_TEST(MemTable) {
     UNIT_ASSERT_GT(server->MemoryControllerCounters->GetCounter("Consumer/MemTable/Consumption")->Val(), static_cast<i64>(1_KB));
 }
 
+Y_UNIT_TEST(ResourceBroker) {
+    using namespace NResourceBroker;
+
+    TPortManager pm;
+    TServerSettings serverSettings(pm.GetPort(2134));
+    serverSettings.SetDomainName("Root")
+        .SetUseRealThreads(false);
+
+    auto memoryControllerConfig = serverSettings.AppConfig->MutableMemoryControllerConfig();
+    memoryControllerConfig->SetQueryExecutionLimitPercent(15);
+    
+    auto resourceBrokerConfig = serverSettings.AppConfig->MutableResourceBrokerConfig();
+    auto queue = resourceBrokerConfig->AddQueues();
+    queue->SetName("queue_cs_ttl");
+    queue->MutableLimit()->SetMemory(13_MB);
+
+    auto server = MakeIntrusive<TWithMemoryControllerServer>(serverSettings);
+    server->ProcessMemoryInfo->CGroupLimit = 1000_MB;
+    auto& runtime = *server->GetRuntime();
+    TAutoPtr<IEventHandle> handle;
+    auto sender = runtime.AllocateEdgeActor();
+
+    InitRoot(server, sender);
+    
+    runtime.SimulateSleep(TDuration::Seconds(2));
+    runtime.Send(new IEventHandle(MakeResourceBrokerID(), sender, new TEvResourceBroker::TEvConfigRequest(NLocalDb::KqpResourceManagerQueue)));
+    auto config = runtime.GrabEdgeEvent<TEvResourceBroker::TEvConfigResponse>(handle);
+    UNIT_ASSERT_VALUES_EQUAL(config->QueueConfig->GetLimit().GetMemory(), 150_MB);
+
+    server->ProcessMemoryInfo->CGroupLimit = 500_MB;
+    runtime.SimulateSleep(TDuration::Seconds(2));
+    runtime.Send(new IEventHandle(MakeResourceBrokerID(), sender, new TEvResourceBroker::TEvConfigRequest(NLocalDb::KqpResourceManagerQueue)));
+    config = runtime.GrabEdgeEvent<TEvResourceBroker::TEvConfigResponse>(handle);
+    UNIT_ASSERT_VALUES_EQUAL(config->QueueConfig->GetLimit().GetMemory(), 75_MB);
+
+    // ensure that other settings are not affected:
+    runtime.Send(new IEventHandle(MakeResourceBrokerID(), sender, new TEvResourceBroker::TEvConfigRequest("queue_cs_ttl")));
+    config = runtime.GrabEdgeEvent<TEvResourceBroker::TEvConfigResponse>(handle);
+    UNIT_ASSERT_VALUES_EQUAL(config->QueueConfig->GetLimit().GetCpu(), 3);
+    UNIT_ASSERT_VALUES_EQUAL(config->QueueConfig->GetLimit().GetMemory(), 13_MB);
+    runtime.Send(new IEventHandle(MakeResourceBrokerID(), sender, new TEvResourceBroker::TEvConfigRequest("queue_cs_general")));
+    config = runtime.GrabEdgeEvent<TEvResourceBroker::TEvConfigResponse>(handle);
+    UNIT_ASSERT_VALUES_EQUAL(config->QueueConfig->GetLimit().GetCpu(), 3);
+    UNIT_ASSERT_VALUES_EQUAL(config->QueueConfig->GetLimit().GetMemory(), 3221225472);
+
+}
+
 }
 
 }

+ 1 - 0
ydb/core/memory_controller/ya.make

@@ -12,6 +12,7 @@ PEERDIR(
     ydb/core/cms/console
     ydb/core/mon_alloc
     ydb/core/node_whiteboard
+    ydb/core/tablet
 )
 
 END()

+ 6 - 0
ydb/core/protos/memory_controller_config.proto

@@ -10,6 +10,9 @@ message TMemoryControllerConfig {
     optional float TargetUtilizationPercent = 4 [default = 50];
     optional uint64 TargetUtilizationBytes = 5;
 
+    optional float ActivitiesLimitPercent = 6 [default = 30];
+    optional uint64 ActivitiesLimitBytes = 7;
+
     optional float SharedCacheMinPercent = 100 [default = 20];
     optional uint64 SharedCacheMinBytes = 101;
     optional float SharedCacheMaxPercent = 102 [default = 50];
@@ -19,4 +22,7 @@ message TMemoryControllerConfig {
     optional uint64 MemTableMinBytes = 111;
     optional float MemTableMaxPercent = 112 [default = 3];
     optional uint64 MemTableMaxBytes = 113;
+
+    optional float QueryExecutionLimitPercent = 120 [default = 20];
+    optional uint64 QueryExecutionLimitBytes = 121;
 }

+ 44 - 28
ydb/core/tablet/resource_broker.cpp

@@ -138,7 +138,7 @@ void TDurationStat::Add(TDuration duration)
 
 TDuration TDurationStat::GetAverage() const
 {
-    return Total / Values.size();
+    return Total / Max<size_t>(1, Values.size());
 }
 
 bool TTaskQueue::TTaskEarlier::operator()(const TTaskPtr &l,
@@ -284,7 +284,7 @@ void TTaskQueue::UpdateRealResourceUsage(TInstant now)
 
     // Find dominant resource consumption and update usage
     auto dom = GetDominantResourceComponentNormalized(QueueLimit.Used);
-    auto usage = RealResourceUsage + dom * duration.MilliSeconds() / Weight;
+    auto usage = RealResourceUsage + dom * duration.MilliSeconds() / Max(1u, Weight);
     RealResourceUsage = usage;
 
     UsageTimestamp = now;
@@ -305,11 +305,11 @@ void TTaskQueue::UpdatePlannedResourceUsage(TTaskPtr task,
 
     auto dom = GetDominantResourceComponentNormalized(task->RequiredResources);
     if (decrease) {
-        PlannedResourceUsage -= dom * duration.MilliSeconds() / Weight;
+        PlannedResourceUsage -= dom * duration.MilliSeconds() / Max(1u, Weight);
         PlannedResourceUsage = Max(PlannedResourceUsage, RealResourceUsage);
     } else {
         PlannedResourceUsage = Max(PlannedResourceUsage, RealResourceUsage);
-        PlannedResourceUsage += dom * duration.MilliSeconds() / Weight;
+        PlannedResourceUsage += dom * duration.MilliSeconds() / Max(1u, Weight);
     }
 }
 
@@ -317,9 +317,9 @@ double TTaskQueue::GetDominantResourceComponentNormalized(const TResourceValues
 {
     std::array<double, RESOURCE_COUNT> norm;
     for (size_t i = 0; i < norm.size(); ++i)
-        norm[i] = (double)QueueLimit.Used[i] / (double)TotalLimit->Limit[i];
+        norm[i] = (double)QueueLimit.Used[i] / (double)Max(1lu, TotalLimit->Limit[i]);
     size_t i = MaxElement(norm.begin(), norm.end()) - norm.begin();
-    return (double)values[i] / (double)TotalLimit->Limit[i];
+    return (double)values[i] / (double)Max(1lu, TotalLimit->Limit[i]);
 }
 
 void TTaskQueue::OutputState(IOutputStream &os, const TString &prefix) const
@@ -812,6 +812,13 @@ TResourceBroker::TResourceBroker(const TResourceBrokerConfig &config,
     }
 }
 
+NKikimrResourceBroker::TResourceBrokerConfig TResourceBroker::GetConfig() const
+{
+    with_lock(Lock) {
+        return Config;
+    }
+}
+
 void TResourceBroker::Configure(const TResourceBrokerConfig &config)
 {
     with_lock(Lock) {
@@ -1098,8 +1105,8 @@ void TResourceBroker::OutputState(TStringStream& str)
 
 TResourceBrokerActor::TResourceBrokerActor(const TResourceBrokerConfig &config,
                                            const ::NMonitoring::TDynamicCounterPtr &counters)
-    : Config(config)
-    , Counters(counters)
+    : BootstrapConfig(config)
+    , BootstrapCounters(counters)
 {
 }
 
@@ -1114,7 +1121,7 @@ void TResourceBrokerActor::Bootstrap(const TActorContext &ctx)
                                false, ctx.ExecutorThread.ActorSystem, ctx.SelfID);
     }
 
-    ResourceBroker = MakeIntrusive<TResourceBroker>(std::move(Config), std::move(Counters), ctx.ActorSystem());
+    ResourceBroker = MakeIntrusive<TResourceBroker>(std::move(BootstrapConfig), std::move(BootstrapCounters), ctx.ActorSystem());
     Become(&TThis::StateWork);
 }
 
@@ -1177,20 +1184,23 @@ void TResourceBrokerActor::Handle(TEvResourceBroker::TEvNotifyActorDied::TPtr &e
 void TResourceBrokerActor::Handle(TEvResourceBroker::TEvConfigure::TPtr &ev,
                                   const TActorContext &ctx)
 {
-    auto &rec = ev->Get()->Record;
-    TAutoPtr<TEvResourceBroker::TEvConfigureResult> response
-        = new TEvResourceBroker::TEvConfigureResult;
+    auto &config = ev->Get()->Record;
+    if (ev->Get()->Merge) {
+        LOG_INFO_S(ctx, NKikimrServices::RESOURCE_BROKER, "New config diff: " << config.ShortDebugString());
+        auto current = ResourceBroker->GetConfig();
+        MergeConfigUpdates(current, config);
+        config.Swap(&current);
+    }
 
-    LOG_DEBUG(ctx, NKikimrServices::RESOURCE_BROKER, "New config: %s",
-              rec.ShortDebugString().data());
+    LOG_INFO_S(ctx, NKikimrServices::RESOURCE_BROKER, "New config: " << config.ShortDebugString());
 
     TSet<TString> queues;
     TSet<TString> tasks;
     bool success = true;
     TString error;
-    for (auto &queue : rec.GetQueues())
+    for (auto &queue : config.GetQueues())
         queues.insert(queue.GetName());
-    for (auto &task : rec.GetTasks()) {
+    for (auto &task : config.GetTasks()) {
         if (!queues.contains(task.GetQueueName())) {
             error = Sprintf("task '%s' uses unknown queue '%s'", task.GetName().data(), task.GetQueueName().data());
             success = false;
@@ -1207,6 +1217,8 @@ void TResourceBrokerActor::Handle(TEvResourceBroker::TEvConfigure::TPtr &ev,
         success = false;
     }
 
+    TAutoPtr<TEvResourceBroker::TEvConfigureResult> response = new TEvResourceBroker::TEvConfigureResult;
+
     if (!success) {
         response->Record.SetSuccess(false);
         response->Record.SetMessage(error);
@@ -1219,19 +1231,22 @@ void TResourceBrokerActor::Handle(TEvResourceBroker::TEvConfigure::TPtr &ev,
     } else {
         response->Record.SetSuccess(true);
 
-        ResourceBroker->Configure(std::move(ev->Get()->Record));
+        ResourceBroker->Configure(std::move(config));
     }
 
-    LOG_DEBUG(ctx, NKikimrServices::RESOURCE_BROKER, "Configure result: %s",
-              response->Record.ShortDebugString().data());
+    LOG_LOG_S(ctx, 
+        success ? NActors::NLog::PRI_INFO : NActors::NLog::PRI_ERROR, 
+        NKikimrServices::RESOURCE_BROKER, 
+        "Configure result: " << response->Record.ShortDebugString());
 
     ctx.Send(ev->Sender, response.Release());
 }
 
 void TResourceBrokerActor::Handle(TEvResourceBroker::TEvConfigRequest::TPtr& ev, const TActorContext&)
 {
+    auto config = ResourceBroker->GetConfig();
     auto resp = MakeHolder<TEvResourceBroker::TEvConfigResponse>();
-    for (auto& queue : Config.GetQueues()) {
+    for (auto& queue : config.GetQueues()) {
         if (queue.GetName() == ev->Get()->Queue) {
             resp->QueueConfig = queue;
             break;
@@ -1250,11 +1265,13 @@ void TResourceBrokerActor::Handle(TEvResourceBroker::TEvResourceBrokerRequest::T
 
 void TResourceBrokerActor::Handle(NMon::TEvHttpInfo::TPtr &ev, const TActorContext &ctx)
 {
+    auto config = ResourceBroker->GetConfig();
+    
     TStringStream str;
     HTML(str) {
         PRE() {
             str << "Current config:" << Endl
-                << Config.DebugString() << Endl;
+                << config.DebugString() << Endl;
             ResourceBroker->OutputState(str);
         }
     }
@@ -1266,20 +1283,19 @@ NKikimrResourceBroker::TResourceBrokerConfig MakeDefaultConfig()
     NKikimrResourceBroker::TResourceBrokerConfig config;
 
     const ui64 DefaultQueueCPU = 2;
-
     const ui64 KqpRmQueueCPU = 4;
-    const ui64 KqpRmQueueMemory = 10ULL << 30;
+    const ui64 TotalCPU = 20;
+
+    // Note: these memory limits will be overwritten by MemoryController
+    const ui64 KqpRmQueueMemory = 10_GB;
+    const ui64 TotalMemory = 16_GB;
+    static_assert(KqpRmQueueMemory < TotalMemory);
 
     const ui64 CSTTLCompactionMemoryLimit = NOlap::TGlobalLimits::TTLCompactionMemoryLimit;
     const ui64 CSInsertCompactionMemoryLimit = NOlap::TGlobalLimits::InsertCompactionMemoryLimit;
     const ui64 CSGeneralCompactionMemoryLimit = NOlap::TGlobalLimits::GeneralCompactionMemoryLimit;
     const ui64 CSScanMemoryLimit = NOlap::TGlobalLimits::ScanMemoryLimit;
 
-    const ui64 TotalCPU = 20;
-    const ui64 TotalMemory = 16ULL << 30;
-
-    static_assert(KqpRmQueueMemory < TotalMemory);
-
     auto queue = config.AddQueues();
     queue->SetName(NLocalDb::DefaultQueueName);
     queue->SetWeight(30);

+ 1 - 0
ydb/core/tablet/resource_broker.h

@@ -191,6 +191,7 @@ struct TEvResourceBroker {
     struct TEvConfigure : public TEventPB<TEvConfigure,
                                           NKikimrResourceBroker::TResourceBrokerConfig,
                                           EvConfigure> {
+        bool Merge = false;
     };
 
     struct TEvConfigureResult : public TEventPB<TEvConfigureResult,

+ 3 - 2
ydb/core/tablet/resource_broker_impl.h

@@ -414,6 +414,7 @@ public:
     bool MergeTasksInstant(ui64 recipientTaskId, ui64 donorTaskId, const TActorId &sender) override;
     bool ReduceTaskResourcesInstant(ui64 taskId, const TResourceValues& reduceBy, const TActorId& sender) override;
 
+    NKikimrResourceBroker::TResourceBrokerConfig GetConfig() const;
     void Configure(const NKikimrResourceBroker::TResourceBrokerConfig &config);
 
     using TOpError = THolder<TEvResourceBroker::TEvTaskOperationError>;
@@ -478,8 +479,8 @@ private:
     void Handle(TEvResourceBroker::TEvResourceBrokerRequest::TPtr &ev, const TActorContext &ctx);
     void Handle(NMon::TEvHttpInfo::TPtr &ev, const TActorContext &ctx);
 
-    NKikimrResourceBroker::TResourceBrokerConfig Config;
-    ::NMonitoring::TDynamicCounterPtr Counters;
+    NKikimrResourceBroker::TResourceBrokerConfig BootstrapConfig;
+    ::NMonitoring::TDynamicCounterPtr BootstrapCounters;
     TIntrusivePtr<TResourceBroker> ResourceBroker;
 };
 

+ 1 - 1
ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp

@@ -1352,7 +1352,7 @@ public:
         // After exact mem control implementation, this allocation should be deleted
         if (!MemoryQuotaManager->AllocateQuota(ReadActorFactoryCfg.DataInflight)) {
             TIssues issues;
-            issues.AddIssue(TIssue{TStringBuilder() << "OutOfMemory - can't allocate read buffer"});
+            issues.AddIssue(TIssue{TStringBuilder() << "OutOfMemory - can't allocate " << ReadActorFactoryCfg.DataInflight << "b read buffer"});
             Send(ComputeActorId, new TEvAsyncInputError(InputIndex, std::move(issues), NYql::NDqProto::StatusIds::OVERLOADED));
             return;
         }

Some files were not shown because too many files changed in this diff