Browse Source

Add light indicator for bursts to BsCostModel, #1336 (#1337)

* Move TLight to core/util, add BurstDetector to TBsCostTracker

* Add UT

* Count PDisk responses, fix UT build

* Update BurstDetector

* Fix UT
Sergey Belyakov 1 year ago
parent
commit
3b1a363cec

+ 1 - 264
ydb/core/blobstorage/pdisk/blobstorage_pdisk_mon.h

@@ -4,6 +4,7 @@
 #include <ydb/core/blobstorage/lwtrace_probes/blobstorage_probes.h>
 #include <ydb/core/mon/mon.h>
 #include <ydb/core/protos/node_whiteboard.pb.h>
+#include <ydb/core/util/light.h>
 
 #include <library/cpp/bucket_quoter/bucket_quoter.h>
 #include <library/cpp/containers/stack_vector/stack_vec.h>
@@ -14,270 +15,6 @@ namespace NKikimr {
 
 struct TPDiskConfig;
 
-inline NHPTimer::STime HPNow() {
-    NHPTimer::STime ret;
-    GetTimeFast(&ret);
-    return ret;
-}
-
-inline double HPSecondsFloat(i64 cycles) {
-    if (cycles > 0) {
-        return double(cycles) / NHPTimer::GetClockRate();
-    } else {
-        return 0.0;
-    }
-}
-
-inline double HPMilliSecondsFloat(i64 cycles) {
-    if (cycles > 0) {
-        return double(cycles) * 1000.0 / NHPTimer::GetClockRate();
-    } else {
-        return 0;
-    }
-}
-
-inline ui64 HPMilliSeconds(i64 cycles) {
-    return (ui64)HPMilliSecondsFloat(cycles);
-}
-
-inline ui64 HPMicroSecondsFloat(i64 cycles) {
-    if (cycles > 0) {
-        return double(cycles) * 1000000.0 / NHPTimer::GetClockRate();
-    } else {
-        return 0;
-    }
-}
-
-inline ui64 HPMicroSeconds(i64 cycles) {
-    return (ui64)HPMicroSecondsFloat(cycles);
-}
-
-inline ui64 HPNanoSeconds(i64 cycles) {
-    if (cycles > 0) {
-        return ui64(double(cycles) * 1000000000.0 / NHPTimer::GetClockRate());
-    } else {
-        return 0;
-    }
-}
-
-inline ui64 HPCyclesNs(ui64 ns) {
-    return ui64(NHPTimer::GetClockRate() * double(ns) / 1000000000.0);
-}
-
-inline ui64 HPCyclesUs(ui64 us) {
-    return ui64(NHPTimer::GetClockRate() * double(us) / 1000000.0);
-}
-
-inline ui64 HPCyclesMs(ui64 ms) {
-    return ui64(NHPTimer::GetClockRate() * double(ms) / 1000.0);
-}
-
-class TLightBase {
-protected:
-    TString Name;
-    ::NMonitoring::TDynamicCounters::TCounterPtr State; // Current state (0=OFF=green, 1=ON=red)
-    ::NMonitoring::TDynamicCounters::TCounterPtr Count; // Number of switches to ON state
-    ::NMonitoring::TDynamicCounters::TCounterPtr RedMs; // Time elapsed in ON state
-    ::NMonitoring::TDynamicCounters::TCounterPtr GreenMs; // Time elapsed in OFF state
-private:
-    ui64 RedCycles = 0;
-    ui64 GreenCycles = 0;
-    NHPTimer::STime AdvancedTill = 0;
-    NHPTimer::STime LastNow = 0;
-    ui64 UpdateThreshold = 0;
-public:
-    void Initialize(TIntrusivePtr<::NMonitoring::TDynamicCounters>& counters, const TString& name) {
-        Name = name;
-        State = counters->GetCounter(name + "_state");
-        Count = counters->GetCounter(name + "_count", true);
-        RedMs = counters->GetCounter(name + "_redMs", true);
-        GreenMs = counters->GetCounter(name + "_greenMs", true);
-        UpdateThreshold = HPCyclesMs(100);
-        AdvancedTill = Now();
-    }
-
-    void Initialize(TIntrusivePtr<::NMonitoring::TDynamicCounters>& counters, const TString& countName,
-            const TString& redMsName,const TString& greenMsName) {
-        Count = counters->GetCounter(countName, true);
-        RedMs = counters->GetCounter(redMsName, true);
-        GreenMs = counters->GetCounter(greenMsName, true);
-        UpdateThreshold = HPCyclesMs(100);
-        AdvancedTill = Now();
-    }
-
-    ui64 GetCount() const {
-        return *Count;
-    }
-
-    ui64 GetRedMs() const {
-        return *RedMs;
-    }
-
-    ui64 GetGreenMs() const {
-        return *GreenMs;
-    }
-protected:
-    void Modify(bool state, bool prevState) {
-        if (state && !prevState) { // Switched to ON state
-            if (State) {
-                *State = true;
-            }
-            (*Count)++;
-            return;
-        }
-        if (!state && prevState) { // Switched to OFF state
-            if (State) {
-                *State = false;
-            }
-            return;
-        }
-    }
-
-    void Advance(bool state, NHPTimer::STime now) {
-        if (now == AdvancedTill) {
-            return;
-        }
-        Elapsed(state, now - AdvancedTill);
-        if (RedCycles > UpdateThreshold) {
-            *RedMs += CutMs(RedCycles);
-        }
-        if (GreenCycles > UpdateThreshold) {
-            *GreenMs += CutMs(GreenCycles);
-        }
-        AdvancedTill = now;
-    }
-
-    NHPTimer::STime Now() {
-        // Avoid time going backwards
-        NHPTimer::STime now = HPNow();
-        if (now < LastNow) {
-            now = LastNow;
-        }
-        LastNow = now;
-        return now;
-    }
-private:
-    void Elapsed(bool state, ui64 cycles) {
-        if (state) {
-            RedCycles += cycles;
-        } else {
-            GreenCycles += cycles;
-        }
-    }
-
-    ui64 CutMs(ui64& src) {
-        ui64 ms = HPMilliSeconds(src);
-        ui64 cycles = HPCyclesMs(ms);
-        src -= cycles;
-        return ms;
-    }
-};
-
-// Thread-safe light
-class TLight : public TLightBase {
-private:
-    struct TItem {
-        bool State;
-        bool Filled;
-        TItem(bool state = false, bool filled = false)
-            : State(state)
-            , Filled(filled)
-        {}
-    };
-
-    // Cyclic buffer to enforce event ordering by seqno
-    TSpinLock Lock;
-    size_t HeadIdx = 0; // Index of current state
-    size_t FilledCount = 0;
-    ui16 Seqno = 0; // Current seqno
-    TStackVec<TItem, 32> Data; // In theory should have not more than thread count items
-public:
-    TLight() {
-        InitData();
-    }
-
-    void Set(bool state, ui16 seqno) {
-        TGuard<TSpinLock> g(Lock);
-        Push(state, seqno);
-        bool prevState;
-        // Note that 'state' variable is being reused
-        NHPTimer::STime now = Now();
-        while (Pop(state, prevState)) {
-            Modify(state, prevState);
-            Advance(prevState, now);
-        }
-    }
-
-    void Update() {
-        TGuard<TSpinLock> g(Lock);
-        Advance(Data[HeadIdx].State, Now());
-    }
-
-private:
-    void InitData(bool state = false, bool filled = false) {
-        Data.clear();
-        Data.emplace_back(state, filled);
-        Data.resize(32);
-        HeadIdx = 0;
-    }
-
-    void Push(bool state, ui16 seqno) {
-        FilledCount++;
-        if (FilledCount == 1) { // First event must initialize seqno
-            Seqno = seqno;
-            InitData(state, true);
-            if (state) {
-                Modify(true, false);
-            }
-            return;
-        }
-        Y_ABORT_UNLESS(seqno != Seqno, "ordering overflow or duplicate event headSeqno# %d seqno# %d state# %d filled# %d",
-                 (int)Seqno, (int)seqno, (int)state, (int)CountFilled());
-        ui16 diff = seqno;
-        diff -= Seqno; // Underflow is fine
-        size_t size = Data.size();
-        if (size <= diff) { // Buffer is full -- extend and move wrapped part
-            Data.resize(size * 2);
-            for (size_t i = 0; i < HeadIdx; i++) {
-                Data[size + i] = Data[i];
-                Data[i].Filled = false;
-            }
-        }
-        TItem& item = Data[(HeadIdx + diff) % Data.size()];
-        Y_ABORT_UNLESS(!item.Filled, "ordering overflow or duplicate event headSeqno# %d seqno# %d state# %d filled# %d",
-                 (int)Seqno, (int)seqno, (int)state, (int)CountFilled());
-        item.Filled = true;
-        item.State = state;
-    }
-
-    bool Pop(bool& state, bool& prevState) {
-        size_t nextIdx = (HeadIdx + 1) % Data.size();
-        TItem& head = Data[HeadIdx];
-        TItem& next = Data[nextIdx];
-        if (!head.Filled || !next.Filled) {
-            return false;
-        }
-        state = next.State;
-        prevState = head.State;
-        head.Filled = false;
-        HeadIdx = nextIdx;
-        Seqno++; // Overflow is fine
-        FilledCount--;
-        if (FilledCount == 1 && Data.size() > 32) {
-            InitData(state, true);
-        }
-        return true;
-    }
-
-    size_t CountFilled() const {
-        size_t ret = 0;
-        for (const TItem& item : Data) {
-            ret += item.Filled;
-        }
-        return ret;
-    }
-};
-
 class TBurstmeter {
 private:
     TBucketQuoter<i64, TSpinLock, THPTimerUs> Bucket;

+ 143 - 65
ydb/core/blobstorage/ut_blobstorage/monitoring.cpp

@@ -14,9 +14,17 @@ TString MakeData(ui32 dataSize) {
 template <typename TDerived>
 class TInflightActor : public TActorBootstrapped<TDerived> {
 public:
-    TInflightActor(ui32 requests, ui32 inflight)
-        : RequestCount(requests)
-        , RequestInflight(inflight)
+    struct TSettings {
+        ui32 Requests;
+        ui32 MaxInFlight;
+        TDuration Delay = TDuration::Zero();
+    };
+
+public:
+    TInflightActor(TSettings settings)
+        : RequestsToSend(settings.Requests)
+        , RequestInFlight(settings.MaxInFlight)
+        , Settings(settings)
     {}
 
     virtual ~TInflightActor() = default;
@@ -29,11 +37,18 @@ public:
     }
 
 protected:
-    void SendRequests() {
-        while (RequestInflight > 0 && RequestCount > 0) {
-            RequestInflight--;
-            RequestCount--;
-            SendRequest();
+    void ScheduleRequests() {
+        while (RequestInFlight > 0 && RequestsToSend > 0) {
+            TMonotonic now = TMonotonic::Now();
+            TDuration timePassed = now - LastTs;
+            if (timePassed >= Settings.Delay) {
+                LastTs = now;
+                RequestInFlight--;
+                RequestsToSend--;
+                SendRequest();
+            } else {
+                TActorBootstrapped<TDerived>::Schedule(Settings.Delay - timePassed, new TEvents::TEvWakeup);
+            }
         }
     }
 
@@ -43,89 +58,110 @@ protected:
         } else {
             Fails++;
         }
-        ++RequestInflight;
-        SendRequests();
+        ++RequestInFlight;
+        ScheduleRequests();
     }
 
     virtual void BootstrapImpl(const TActorContext &ctx) = 0;
     virtual void SendRequest() = 0;
 
 protected:
-    ui32 RequestCount;
-    ui32 RequestInflight;
+    ui32 RequestsToSend;
+    ui32 RequestInFlight;
     ui32 GroupId;
+    TMonotonic LastTs;
+    TSettings Settings;
 
 public:
     ui32 OKs = 0;
     ui32 Fails = 0;
 };
 
-template <typename TInflightActor>
-void Test(const TBlobStorageGroupInfo::TTopology& topology, TInflightActor* actor) {
-    const ui32 groupSize = topology.TotalVDisks;
-    const auto& groupErasure = topology.GType;
-    TEnvironmentSetup env{{
+ui64 AggregateVDiskCounters(std::unique_ptr<TEnvironmentSetup>& env, const NKikimrBlobStorage::TBaseConfig& baseConfig,
+        TString storagePool, ui32 groupSize, ui32 groupId, const std::vector<ui32>& pdiskLayout, TString subsystem,
+        TString counter, bool derivative = false) {
+    ui64 ctr = 0;
+
+    for (const auto& vslot : baseConfig.GetVSlot()) {
+        auto* appData = env->Runtime->GetNode(vslot.GetVSlotId().GetNodeId())->AppData.get();
+        for (ui32 i = 0; i < groupSize; ++i) {
+            ctr += GetServiceCounters(appData->Counters, "vdisks")->
+                    GetSubgroup("storagePool", storagePool)->
+                    GetSubgroup("group", std::to_string(groupId))->
+                    GetSubgroup("orderNumber", "0" + std::to_string(i))->
+                    GetSubgroup("pdisk", "00000" + std::to_string(pdiskLayout[i]))->
+                    GetSubgroup("media", "rot")->
+                    GetSubgroup("subsystem", subsystem)->
+                    GetCounter(counter, derivative)->Val();
+        }
+    }
+    return ctr;
+};
+
+void SetupEnv(const TBlobStorageGroupInfo::TTopology& topology, std::unique_ptr<TEnvironmentSetup>& env,
+        NKikimrBlobStorage::TBaseConfig& baseConfig, ui32& groupSize, TBlobStorageGroupType& groupType,
+        ui32& groupId, std::vector<ui32>& pdiskLayout) {
+    groupSize = topology.TotalVDisks;
+    groupType = topology.GType;
+    env.reset(new TEnvironmentSetup({
         .NodeCount = groupSize,
-        .Erasure = groupErasure,
-    }};
+        .Erasure = groupType,
+    }));
 
-    env.CreateBoxAndPool(1, 1);
-    env.Sim(TDuration::Seconds(30));
+    env->CreateBoxAndPool(1, 1);
+    env->Sim(TDuration::Seconds(30));
 
     NKikimrBlobStorage::TConfigRequest request;
     request.AddCommand()->MutableQueryBaseConfig();
-    auto response = env.Invoke(request);
+    auto response = env->Invoke(request);
 
-    const auto& baseConfig = response.GetStatus(0).GetBaseConfig();
+    baseConfig = response.GetStatus(0).GetBaseConfig();
     UNIT_ASSERT_VALUES_EQUAL(baseConfig.GroupSize(), 1);
-    ui32 groupId = baseConfig.GetGroup(0).GetGroupId();
-    std::vector<ui32> pdiskIds(groupSize);
+    groupId = baseConfig.GetGroup(0).GetGroupId();
+    pdiskLayout.resize(groupSize);
     for (const auto& vslot : baseConfig.GetVSlot()) {
         const auto& vslotId = vslot.GetVSlotId();
         ui32 orderNumber = topology.GetOrderNumber(TVDiskIdShort(vslot.GetFailRealmIdx(), vslot.GetFailDomainIdx(), vslot.GetVDiskIdx()));
         if (vslot.GetGroupId() == groupId) {
-            pdiskIds[orderNumber] = vslotId.GetPDiskId();
+            pdiskLayout[orderNumber] = vslotId.GetPDiskId();
         }
     }
+}
+
+template <typename TInflightActor>
+void TestDSProxyAndVDiskEqualCost(const TBlobStorageGroupInfo::TTopology& topology, TInflightActor* actor) {
+    std::unique_ptr<TEnvironmentSetup> env;
+    NKikimrBlobStorage::TBaseConfig baseConfig;
+    ui32 groupSize;
+    TBlobStorageGroupType groupType;
+    ui32 groupId;
+    std::vector<ui32> pdiskLayout;
+    SetupEnv(topology, env, baseConfig, groupSize, groupType, groupId, pdiskLayout);
 
     ui64 dsproxyCost = 0;
     ui64 vdiskCost = 0;
 
-    auto vdisksTotal = [&](TString subsystem, TString counter, bool derivative = false) {
-        ui64 ctr = 0;
-        for (const auto& vslot : baseConfig.GetVSlot()) {
-            auto* appData = env.Runtime->GetNode(vslot.GetVSlotId().GetNodeId())->AppData.get();
-            for (ui32 i = 0; i < groupSize; ++i) {
-                ctr += GetServiceCounters(appData->Counters, "vdisks")->
-                        GetSubgroup("storagePool", env.StoragePoolName)->
-                        GetSubgroup("group", std::to_string(groupId))->
-                        GetSubgroup("orderNumber", "0" + std::to_string(i))->
-                        GetSubgroup("pdisk", "00000" + std::to_string(pdiskIds[i]))->
-                        GetSubgroup("media", "rot")->
-                        GetSubgroup("subsystem", subsystem)->
-                        GetCounter(counter, derivative)->Val();
-            }
-        }
-        return ctr;
-    };
 
     auto updateCounters = [&]() {
+        dsproxyCost = 0;
+
         for (const auto& vslot : baseConfig.GetVSlot()) {
-            auto* appData = env.Runtime->GetNode(vslot.GetVSlotId().GetNodeId())->AppData.get();
+            auto* appData = env->Runtime->GetNode(vslot.GetVSlotId().GetNodeId())->AppData.get();
             dsproxyCost += GetServiceCounters(appData->Counters, "dsproxynode")->
                     GetSubgroup("subsystem", "request")->
-                    GetSubgroup("storagePool", env.StoragePoolName)->
+                    GetSubgroup("storagePool", env->StoragePoolName)->
                     GetCounter("DSProxyDiskCostNs")->Val();
         }
-        vdiskCost = vdisksTotal("cost", "SkeletonFrontUserCostNs");
+        vdiskCost = AggregateVDiskCounters(env, baseConfig, env->StoragePoolName, groupSize, groupId,
+                pdiskLayout, "cost", "SkeletonFrontUserCostNs");
     };
 
     updateCounters();
     UNIT_ASSERT_VALUES_EQUAL(dsproxyCost, vdiskCost);
 
     actor->SetGroupId(groupId);
-    env.Runtime->Register(actor, 1);
-    env.Sim(TDuration::Minutes(15));
+    env->Runtime->Register(actor, 1);
+    env->Sim(TDuration::Minutes(15));
 
     updateCounters();
 
@@ -138,19 +174,21 @@ void Test(const TBlobStorageGroupInfo::TTopology& topology, TInflightActor* acto
 
     if constexpr(VERBOSE) {
         Cerr << str.Str() << Endl;
+        // env->Runtime->GetAppData()->Counters->OutputPlainText(Cerr);
     }
     UNIT_ASSERT_VALUES_EQUAL_C(dsproxyCost, vdiskCost, str.Str());
 }
 
 class TInflightActorPut : public TInflightActor<TInflightActorPut> {
 public:
-    TInflightActorPut(ui32 requests, ui32 inflight, ui32 dataSize = 1024)
-        : TInflightActor(requests, inflight)
+    TInflightActorPut(TSettings settings, ui32 dataSize = 1024)
+        : TInflightActor(settings)
         , DataSize(dataSize)
     {}
 
     STRICT_STFUNC(StateWork,
-        cFunc(TEvBlobStorage::TEvStatusResult::EventType, SendRequests);
+        cFunc(TEvBlobStorage::TEvStatusResult::EventType, ScheduleRequests);
+        cFunc(TEvents::TEvWakeup::EventType, ScheduleRequests);
         hFunc(TEvBlobStorage::TEvPutResult, Handle);
     )
 
@@ -164,7 +202,7 @@ public:
 protected:
     virtual void SendRequest() override {
         TString data = MakeData(DataSize);
-        auto ev = new TEvBlobStorage::TEvPut(TLogoBlobID(1, 1, 1, 10, DataSize, RequestCount + 1),
+        auto ev = new TEvBlobStorage::TEvPut(TLogoBlobID(1, 1, 1, 10, DataSize, RequestsToSend + 1),
                 data, TInstant::Max(), NKikimrBlobStorage::UserData);
         SendToBSProxy(SelfId(), GroupId, ev, 0);
     }
@@ -184,8 +222,8 @@ Y_UNIT_TEST(Test##requestType##erasure##Requests##requests##Inflight##inflight)
     ui32 realms = (groupType == TBlobStorageGroupType::ErasureMirror3dc) ? 3 : 1;   \
     ui32 domains = (groupType == TBlobStorageGroupType::ErasureMirror3dc) ? 3 : 8;  \
     TBlobStorageGroupInfo::TTopology topology(groupType, realms, domains, 1, true); \
-    auto actor = new TInflightActor##requestType(requests, inflight);               \
-    Test(topology, actor);                                                          \
+    auto actor = new TInflightActor##requestType({requests, inflight});             \
+    TestDSProxyAndVDiskEqualCost(topology, actor);                                  \
 }
 
 #define MAKE_TEST_W_DATASIZE(erasure, requestType, requests, inflight, dataSize)                        \
@@ -194,19 +232,20 @@ Y_UNIT_TEST(Test##requestType##erasure##Requests##requests##Inflight##inflight##
     ui32 realms = (groupType == TBlobStorageGroupType::ErasureMirror3dc) ? 3 : 1;                       \
     ui32 domains = (groupType == TBlobStorageGroupType::ErasureMirror3dc) ? 3 : 8;                      \
     TBlobStorageGroupInfo::TTopology topology(groupType, realms, domains, 1, true);                     \
-    auto actor = new TInflightActor##requestType(requests, inflight, dataSize);                         \
-    Test(topology, actor);                                                                              \
+    auto actor = new TInflightActor##requestType({requests, inflight}, dataSize);                       \
+    TestDSProxyAndVDiskEqualCost(topology, actor);                                                      \
 }
 
 class TInflightActorGet : public TInflightActor<TInflightActorGet> {
 public:
-    TInflightActorGet(ui32 requests, ui32 inflight, ui32 dataSize = 1024)
-        : TInflightActor(requests, inflight)
+    TInflightActorGet(TSettings settings, ui32 dataSize = 1024)
+        : TInflightActor(settings)
         , DataSize(dataSize)
     {}
 
     STRICT_STFUNC(StateWork,
-        cFunc(TEvBlobStorage::TEvPutResult::EventType, SendRequests);
+        cFunc(TEvBlobStorage::TEvPutResult::EventType, ScheduleRequests);
+        cFunc(TEvents::TEvWakeup::EventType, ScheduleRequests);
         hFunc(TEvBlobStorage::TEvGetResult, Handle);
     )
 
@@ -236,8 +275,8 @@ private:
 
 class TInflightActorPatch : public TInflightActor<TInflightActorPatch> {
 public:
-    TInflightActorPatch(ui32 requests, ui32 inflight, ui32 dataSize = 1024)
-        : TInflightActor(requests, inflight)
+    TInflightActorPatch(TSettings settings, ui32 dataSize = 1024)
+        : TInflightActor(settings)
         , DataSize(dataSize)
     {}
 
@@ -248,7 +287,7 @@ public:
 
     virtual void BootstrapImpl(const TActorContext&/* ctx*/) override {
         TString data = MakeData(DataSize);
-        for (ui32 i = 0; i < RequestInflight; ++i) {
+        for (ui32 i = 0; i < RequestInFlight; ++i) {
             TLogoBlobID blobId(1, 1, 1, 10, DataSize, 1 + i);
             auto ev = new TEvBlobStorage::TEvPut(blobId, data, TInstant::Max());
             SendToBSProxy(SelfId(), GroupId, ev, 0);
@@ -263,7 +302,7 @@ protected:
         TLogoBlobID newId(1, 1, oldId.Step() + 1, 10, DataSize, oldId.Cookie());
         Y_ABORT_UNLESS(TEvBlobStorage::TEvPatch::GetBlobIdWithSamePlacement(oldId, &newId, BlobIdMask, GroupId, GroupId));
         TArrayHolder<TEvBlobStorage::TEvPatch::TDiff> diffs(new TEvBlobStorage::TEvPatch::TDiff[1]);
-        char c = 'a' + RequestCount % 26;
+        char c = 'a' + RequestsToSend % 26;
         diffs[0].Set(TString(DataSize, c), 0);
         auto ev = new TEvBlobStorage::TEvPatch(GroupId, oldId, newId, BlobIdMask, std::move(diffs), 1, TInstant::Max());
         SendToBSProxy(SelfId(), GroupId, ev, 0);
@@ -277,8 +316,8 @@ protected:
 
     void Handle(TEvBlobStorage::TEvPutResult::TPtr res) {
         Blobs.push_back(res->Get()->Id);
-        if (++BlobsWritten == RequestInflight) {
-            SendRequests();
+        if (++BlobsWritten == RequestInFlight) {
+            ScheduleRequests();
         }
     }
 
@@ -368,3 +407,42 @@ Y_UNIT_TEST_SUITE(CostMetricsPatchBlock4Plus2) {
     MAKE_TEST_W_DATASIZE(4Plus2Block, Patch, 100, 10, 1000);
     MAKE_TEST_W_DATASIZE(4Plus2Block, Patch, 10000, 100, 1000);
 }
+
+template <typename TInflightActor>
+void TestBurst(const TBlobStorageGroupInfo::TTopology& topology, TInflightActor* actor, bool burstExpected) {
+    std::unique_ptr<TEnvironmentSetup> env;
+    NKikimrBlobStorage::TBaseConfig baseConfig;
+    ui32 groupSize;
+    TBlobStorageGroupType groupType;
+    ui32 groupId;
+    std::vector<ui32> pdiskLayout;
+    SetupEnv(topology, env, baseConfig, groupSize, groupType, groupId, pdiskLayout);
+
+    actor->SetGroupId(groupId);
+    env->Runtime->Register(actor, 1);
+    env->Sim(TDuration::Seconds(15));
+
+    ui64 redMs = AggregateVDiskCounters(env, baseConfig, env->StoragePoolName, groupSize, groupId,
+            pdiskLayout, "advancedCost", "BurstDetector_redMs");
+    
+    if (burstExpected) {
+        UNIT_ASSERT_GT(redMs, 0);
+    } else {
+        UNIT_ASSERT_VALUES_EQUAL(redMs, burstExpected);
+    }
+}
+
+#define MAKE_BURST_TEST(testType, erasure, requestType, requests, inflight, delay, burstExpected)   \
+Y_UNIT_TEST(Test##requestType##testType##erasure) {                                                 \
+    auto groupType = TBlobStorageGroupType::Erasure##erasure;                                       \
+    ui32 realms = (groupType == TBlobStorageGroupType::ErasureMirror3dc) ? 3 : 1;                   \
+    ui32 domains = (groupType == TBlobStorageGroupType::ErasureMirror3dc) ? 3 : 8;                  \
+    TBlobStorageGroupInfo::TTopology topology(groupType, realms, domains, 1, true);                 \
+    auto* actor = new TInflightActor##requestType({requests, inflight, delay}, 1_KB);               \
+    TestBurst(topology, actor, burstExpected);                                                      \
+}
+
+Y_UNIT_TEST_SUITE(BurstDetection) {
+    MAKE_BURST_TEST(Evenly, 4Plus2Block, Put, 100000, 1, TDuration::MicroSeconds(1), false);
+    MAKE_BURST_TEST(Burst, 4Plus2Block, Put, 100000, 1000000, TDuration::Zero(), true);
+}

+ 2 - 0
ydb/core/blobstorage/vdisk/common/blobstorage_cost_tracker.cpp

@@ -38,7 +38,9 @@ TBsCostTracker::TBsCostTracker(const TBlobStorageGroupType& groupType, NPDisk::E
     , ScrubDiskCost(CostCounters->GetCounter("ScrubDiskCost", true))
     , DefragDiskCost(CostCounters->GetCounter("DefragDiskCost", true))
     , InternalDiskCost(CostCounters->GetCounter("InternalDiskCost", true))
+    , Bucket(1'000'000'000, BucketCapacity)
 {
+    BurstDetector.Initialize(CostCounters, "BurstDetector");
     switch (GroupType.GetErasure()) {
     case TBlobStorageGroupType::ErasureMirror3dc:
         CostModel = std::make_unique<TBsCostModelMirror3dc>(diskType);

+ 34 - 7
ydb/core/blobstorage/vdisk/common/blobstorage_cost_tracker.h

@@ -5,8 +5,10 @@
 #include "vdisk_events.h"
 #include "vdisk_handle_class.h"
 
+#include <library/cpp/bucket_quoter/bucket_quoter.h>
 #include <util/system/compiler.h>
 #include <ydb/core/blobstorage/base/blobstorage_events.h>
+#include <ydb/core/util/light.h>
 
 namespace NKikimr {
 
@@ -262,8 +264,7 @@ class TBsCostTracker {
 private:
     TBlobStorageGroupType GroupType;
     std::unique_ptr<TBsCostModelBase> CostModel;
-
-    const TIntrusivePtr<::NMonitoring::TDynamicCounters> CostCounters;
+    TIntrusivePtr<::NMonitoring::TDynamicCounters> CostCounters;
 
     ::NMonitoring::TDynamicCounters::TCounterPtr UserDiskCost;
     ::NMonitoring::TDynamicCounters::TCounterPtr CompactionDiskCost;
@@ -271,6 +272,11 @@ private:
     ::NMonitoring::TDynamicCounters::TCounterPtr DefragDiskCost;
     ::NMonitoring::TDynamicCounters::TCounterPtr InternalDiskCost;
 
+    TBucketQuoter<i64, TSpinLock, THPTimerUs> Bucket;
+    static constexpr ui64 BucketCapacity = 1'000'000'000;
+    TLight BurstDetector;
+    std::atomic<ui64> SeqnoBurstDetector = 0;
+
 public:
     TBsCostTracker(const TBlobStorageGroupType& groupType, NPDisk::EDeviceType diskType,
             const TIntrusivePtr<::NMonitoring::TDynamicCounters>& counters);
@@ -296,38 +302,59 @@ public:
         }
     }
 
+    void CountRequest(ui64 cost) {
+        Bucket.Use(cost);
+        BurstDetector.Set(!Bucket.IsAvail(), SeqnoBurstDetector.fetch_add(1));
+    }
+
 public:
     template<class TEvent>
     void CountUserRequest(const TEvent& ev) {
-        *UserDiskCost += GetCost(ev);
+        ui64 cost = GetCost(ev);
+        *UserDiskCost += cost;
+        CountRequest(cost);
     }
 
     void CountUserCost(ui64 cost) {
         *UserDiskCost += cost;
+        CountRequest(cost);
     }
 
     template<class TEvent>
     void CountCompactionRequest(const TEvent& ev) {
-        *CompactionDiskCost += GetCost(ev);
+        ui64 cost = GetCost(ev);
+        *CompactionDiskCost += cost;
+        CountRequest(cost);
     }
 
     template<class TEvent>
     void CountScrubRequest(const TEvent& ev) {
-        *UserDiskCost += GetCost(ev);
+        ui64 cost = GetCost(ev);
+        *UserDiskCost += cost;
+        CountRequest(cost);
     }
 
     template<class TEvent>
     void CountDefragRequest(const TEvent& ev) {
-        *DefragDiskCost += GetCost(ev);
+        ui64 cost = GetCost(ev);
+        *DefragDiskCost += cost;
+        CountRequest(cost);
     }
 
     template<class TEvent>
     void CountInternalRequest(const TEvent& ev) {
-        *InternalDiskCost += GetCost(ev);
+        ui64 cost = GetCost(ev);
+        *InternalDiskCost += cost;
+        CountRequest(cost);
     }
 
     void CountInternalCost(ui64 cost) {
         *InternalDiskCost += cost;
+        CountRequest(cost);
+    }
+
+    void CountPDiskResponse() {
+        BurstDetector.Set(!Bucket.IsAvail(), SeqnoBurstDetector.fetch_add(1));
     }
 };
 

+ 2 - 0
ydb/core/blobstorage/vdisk/hullop/blobstorage_hullcompact.h

@@ -170,6 +170,7 @@ namespace NKikimr {
         // the same logic for every yard response: apply response and restart main cycle
         void HandleYardResponse(NPDisk::TEvChunkReadResult::TPtr& ev, const TActorContext &ctx) {
             --PendingResponses;
+            HullCtx->VCtx->CostTracker->CountPDiskResponse();
             if (ev->Get()->Status != NKikimrProto::CORRUPTED) {
                 CHECK_PDISK_RESPONSE(HullCtx->VCtx, ev, ctx);
             }
@@ -202,6 +203,7 @@ namespace NKikimr {
 
         void HandleYardResponse(NPDisk::TEvChunkWriteResult::TPtr& ev, const TActorContext &ctx) {
             --PendingResponses;
+            HullCtx->VCtx->CostTracker->CountPDiskResponse();
             CHECK_PDISK_RESPONSE(HullCtx->VCtx, ev, ctx);
             if (FinalizeIfAborting(ctx)) {
                 return;

+ 2 - 0
ydb/core/blobstorage/vdisk/scrub/scrub_actor_pdisk.cpp

@@ -11,6 +11,7 @@ namespace NKikimr {
         Send(ScrubCtx->PDiskCtx->PDiskId, msg.release());
         CurrentState = TStringBuilder() << "reading data from " << part.ToString();
         auto res = WaitForPDiskEvent<NPDisk::TEvChunkReadResult>();
+        ScrubCtx->VCtx->CostTracker->CountPDiskResponse();
         auto *m = res->Get();
         Y_VERIFY_S(m->Status == NKikimrProto::OK || m->Status == NKikimrProto::CORRUPTED,
             "Status# " << NKikimrProto::EReplyStatus_Name(m->Status));
@@ -41,6 +42,7 @@ namespace NKikimr {
         Send(ScrubCtx->PDiskCtx->PDiskId, msg.release());
         CurrentState = TStringBuilder() << "writing index to " << part.ToString();
         auto res = WaitForPDiskEvent<NPDisk::TEvChunkWriteResult>();
+        ScrubCtx->VCtx->CostTracker->CountPDiskResponse();
         Y_ABORT_UNLESS(res->Get()->Status == NKikimrProto::OK); // FIXME: good logic
     }
 

+ 1 - 0
ydb/core/blobstorage/vdisk/skeleton/blobstorage_skeletonfront.cpp

@@ -1632,6 +1632,7 @@ namespace NKikimr {
             extQueue.Completed(ctx, msgCtx, event);
             TIntQueueClass &intQueue = GetIntQueue(msgCtx.IntQueueId);
             intQueue.Completed(ctx, msgCtx, *this, id);
+            VCtx->CostTracker->CountPDiskResponse();
             TActivationContext::Send(event.release());
         }
 

+ 64 - 0
ydb/core/util/hp_timer_helpers.h

@@ -0,0 +1,64 @@
+#include <util/system/hp_timer.h>
+#include <ydb/library/actors/util/datetime.h>
+
+namespace NKikimr {
+
+inline NHPTimer::STime HPNow() {
+    NHPTimer::STime ret;
+    GetTimeFast(&ret);
+    return ret;
+}
+
+inline double HPSecondsFloat(i64 cycles) {
+    if (cycles > 0) {
+        return double(cycles) / NHPTimer::GetClockRate();
+    } else {
+        return 0.0;
+    }
+}
+
+inline double HPMilliSecondsFloat(i64 cycles) {
+    if (cycles > 0) {
+        return double(cycles) * 1000.0 / NHPTimer::GetClockRate();
+    } else {
+        return 0;
+    }
+}
+
+inline ui64 HPMilliSeconds(i64 cycles) {
+    return (ui64)HPMilliSecondsFloat(cycles);
+}
+
+inline ui64 HPMicroSecondsFloat(i64 cycles) {
+    if (cycles > 0) {
+        return double(cycles) * 1000000.0 / NHPTimer::GetClockRate();
+    } else {
+        return 0;
+    }
+}
+
+inline ui64 HPMicroSeconds(i64 cycles) {
+    return (ui64)HPMicroSecondsFloat(cycles);
+}
+
+inline ui64 HPNanoSeconds(i64 cycles) {
+    if (cycles > 0) {
+        return ui64(double(cycles) * 1000000000.0 / NHPTimer::GetClockRate());
+    } else {
+        return 0;
+    }
+}
+
+inline ui64 HPCyclesNs(ui64 ns) {
+    return ui64(NHPTimer::GetClockRate() * double(ns) / 1000000000.0);
+}
+
+inline ui64 HPCyclesUs(ui64 us) {
+    return ui64(NHPTimer::GetClockRate() * double(us) / 1000000.0);
+}
+
+inline ui64 HPCyclesMs(ui64 ms) {
+    return ui64(NHPTimer::GetClockRate() * double(ms) / 1000.0);
+}
+
+} // namespace NKikimr

+ 213 - 0
ydb/core/util/light.h

@@ -0,0 +1,213 @@
+#include <ydb/core/mon/mon.h>
+
+#include "hp_timer_helpers.h"
+
+namespace NKikimr {
+
+class TLightBase {
+protected:
+    TString Name;
+    ::NMonitoring::TDynamicCounters::TCounterPtr State; // Current state (0=OFF=green, 1=ON=red)
+    ::NMonitoring::TDynamicCounters::TCounterPtr Count; // Number of switches to ON state
+    ::NMonitoring::TDynamicCounters::TCounterPtr RedMs; // Time elapsed in ON state
+    ::NMonitoring::TDynamicCounters::TCounterPtr GreenMs; // Time elapsed in OFF state
+private:
+    ui64 RedCycles = 0;
+    ui64 GreenCycles = 0;
+    NHPTimer::STime AdvancedTill = 0;
+    NHPTimer::STime LastNow = 0;
+    ui64 UpdateThreshold = 0;
+public:
+    void Initialize(TIntrusivePtr<::NMonitoring::TDynamicCounters>& counters, const TString& name) {
+        Name = name;
+        State = counters->GetCounter(name + "_state");
+        Count = counters->GetCounter(name + "_count", true);
+        RedMs = counters->GetCounter(name + "_redMs", true);
+        GreenMs = counters->GetCounter(name + "_greenMs", true);
+        UpdateThreshold = HPCyclesMs(100);
+        AdvancedTill = Now();
+    }
+
+    void Initialize(TIntrusivePtr<::NMonitoring::TDynamicCounters>& counters, const TString& countName,
+            const TString& redMsName,const TString& greenMsName) {
+        Count = counters->GetCounter(countName, true);
+        RedMs = counters->GetCounter(redMsName, true);
+        GreenMs = counters->GetCounter(greenMsName, true);
+        UpdateThreshold = HPCyclesMs(100);
+        AdvancedTill = Now();
+    }
+
+    ui64 GetCount() const {
+        return *Count;
+    }
+
+    ui64 GetRedMs() const {
+        return *RedMs;
+    }
+
+    ui64 GetGreenMs() const {
+        return *GreenMs;
+    }
+protected:
+    void Modify(bool state, bool prevState) {
+        if (state && !prevState) { // Switched to ON state
+            if (State) {
+                *State = true;
+            }
+            (*Count)++;
+            return;
+        }
+        if (!state && prevState) { // Switched to OFF state
+            if (State) {
+                *State = false;
+            }
+            return;
+        }
+    }
+
+    void Advance(bool state, NHPTimer::STime now) {
+        if (now == AdvancedTill) {
+            return;
+        }
+        Elapsed(state, now - AdvancedTill);
+        if (RedCycles > UpdateThreshold) {
+            *RedMs += CutMs(RedCycles);
+        }
+        if (GreenCycles > UpdateThreshold) {
+            *GreenMs += CutMs(GreenCycles);
+        }
+        AdvancedTill = now;
+    }
+
+    NHPTimer::STime Now() {
+        // Avoid time going backwards
+        NHPTimer::STime now = HPNow();
+        if (now < LastNow) {
+            now = LastNow;
+        }
+        LastNow = now;
+        return now;
+    }
+private:
+    void Elapsed(bool state, ui64 cycles) {
+        if (state) {
+            RedCycles += cycles;
+        } else {
+            GreenCycles += cycles;
+        }
+    }
+
+    ui64 CutMs(ui64& src) {
+        ui64 ms = HPMilliSeconds(src);
+        ui64 cycles = HPCyclesMs(ms);
+        src -= cycles;
+        return ms;
+    }
+};
+
+// Thread-safe light
+class TLight : public TLightBase {
+private:
+    struct TItem {
+        bool State;
+        bool Filled;
+        TItem(bool state = false, bool filled = false)
+            : State(state)
+            , Filled(filled)
+        {}
+    };
+
+    // Cyclic buffer to enforce event ordering by seqno
+    TSpinLock Lock;
+    size_t HeadIdx = 0; // Index of current state
+    size_t FilledCount = 0;
+    ui16 Seqno = 0; // Current seqno
+    TStackVec<TItem, 32> Data; // In theory should have not more than thread count items
+public:
+    TLight() {
+        InitData();
+    }
+
+    void Set(bool state, ui16 seqno) {
+        TGuard<TSpinLock> g(Lock);
+        Push(state, seqno);
+        bool prevState;
+        // Note that 'state' variable is being reused
+        NHPTimer::STime now = Now();
+        while (Pop(state, prevState)) {
+            Modify(state, prevState);
+            Advance(prevState, now);
+        }
+    }
+
+    void Update() {
+        TGuard<TSpinLock> g(Lock);
+        Advance(Data[HeadIdx].State, Now());
+    }
+
+private:
+    void InitData(bool state = false, bool filled = false) {
+        Data.clear();
+        Data.emplace_back(state, filled);
+        Data.resize(32);
+        HeadIdx = 0;
+    }
+
+    void Push(bool state, ui16 seqno) {
+        FilledCount++;
+        if (FilledCount == 1) { // First event must initialize seqno
+            Seqno = seqno;
+            InitData(state, true);
+            if (state) {
+                Modify(true, false);
+            }
+            return;
+        }
+        Y_ABORT_UNLESS(seqno != Seqno, "ordering overflow or duplicate event headSeqno# %d seqno# %d state# %d filled# %d",
+                 (int)Seqno, (int)seqno, (int)state, (int)CountFilled());
+        ui16 diff = seqno;
+        diff -= Seqno; // Underflow is fine
+        size_t size = Data.size();
+        if (size <= diff) { // Buffer is full -- extend and move wrapped part
+            Data.resize(size * 2);
+            for (size_t i = 0; i < HeadIdx; i++) {
+                Data[size + i] = Data[i];
+                Data[i].Filled = false;
+            }
+        }
+        TItem& item = Data[(HeadIdx + diff) % Data.size()];
+        Y_ABORT_UNLESS(!item.Filled, "ordering overflow or duplicate event headSeqno# %d seqno# %d state# %d filled# %d",
+                 (int)Seqno, (int)seqno, (int)state, (int)CountFilled());
+        item.Filled = true;
+        item.State = state;
+    }
+
+    bool Pop(bool& state, bool& prevState) {
+        size_t nextIdx = (HeadIdx + 1) % Data.size();
+        TItem& head = Data[HeadIdx];
+        TItem& next = Data[nextIdx];
+        if (!head.Filled || !next.Filled) {
+            return false;
+        }
+        state = next.State;
+        prevState = head.State;
+        head.Filled = false;
+        HeadIdx = nextIdx;
+        Seqno++; // Overflow is fine
+        FilledCount--;
+        if (FilledCount == 1 && Data.size() > 32) {
+            InitData(state, true);
+        }
+        return true;
+    }
+
+    size_t CountFilled() const {
+        size_t ret = 0;
+        for (const TItem& item : Data) {
+            ret += item.Filled;
+        }
+        return ret;
+    }
+};
+
+} // namespace NKikimr