Browse Source

Use lock-free Bucket in CostTracker with limited underflow (#8637)

Sergey Belyakov 6 months ago
parent
commit
d22f0864ea

+ 2 - 2
ydb/core/blobstorage/vdisk/common/blobstorage_cost_tracker.cpp

@@ -47,11 +47,11 @@ TBsCostTracker::TBsCostTracker(const TBlobStorageGroupType& groupType, NPDisk::E
     : GroupType(groupType)
     , CostCounters(counters->GetSubgroup("subsystem", "advancedCost"))
     , MonGroup(std::make_shared<NMonGroup::TCostTrackerGroup>(CostCounters))
-    , Bucket(&DiskTimeAvailable, &BucketCapacity, nullptr, nullptr, nullptr, nullptr, true)
+    , Bucket(BucketUpperLimit, BucketLowerLimit, DiskTimeAvailable)
     , BurstThresholdNs(costMetricsParameters.BurstThresholdNs)
     , DiskTimeAvailableScale(costMetricsParameters.DiskTimeAvailableScale)
 {
-    AtomicSet(BucketCapacity, GetDiskTimeAvailableScale() * BurstThresholdNs);
+    BucketUpperLimit.store(BurstThresholdNs * GetDiskTimeAvailableScale());
     BurstDetector.Initialize(CostCounters, "BurstDetector");
     switch (GroupType.GetErasure()) {
     case TBlobStorageGroupType::ErasureMirror3dc:

+ 14 - 10
ydb/core/blobstorage/vdisk/common/blobstorage_cost_tracker.h

@@ -8,6 +8,7 @@
 #include "vdisk_performance_params.h"
 
 #include <library/cpp/bucket_quoter/bucket_quoter.h>
+#include <ydb/library/lockfree_bucket/lockfree_bucket.h>
 #include <util/system/compiler.h>
 #include <ydb/core/base/blobstorage.h>
 #include <ydb/core/blobstorage/base/blobstorage_events.h>
@@ -315,12 +316,14 @@ private:
     TIntrusivePtr<::NMonitoring::TDynamicCounters> CostCounters;
     std::shared_ptr<NMonGroup::TCostTrackerGroup> MonGroup;
 
-    TAtomic BucketCapacity = 1'000'000'000;  // 10^9 nsec
-    TAtomic DiskTimeAvailable = 1'000'000'000;
-    TBucketQuoter<i64, TSpinLock, TAppDataTimerMs<TInstantTimerMs>> Bucket;
+    const double BucketRelativeMinimum = 2;
+    std::atomic<i64> BucketUpperLimit = 1'000'000'000;  // 10^9 nsec
+    std::atomic<i64> BucketLowerLimit = 1'000'000'000 * -BucketRelativeMinimum;
+    std::atomic<ui64> DiskTimeAvailable = 1'000'000'000;
+
+    TLockFreeBucket<TAppDataTimerMs<TInstantTimerMs>> Bucket;
     TLight BurstDetector;
     std::atomic<ui64> SeqnoBurstDetector = 0;
-    static constexpr ui32 ConcurrentHugeRequestsAllowed = 3;
 
     TMemorizableControlWrapper BurstThresholdNs;
     TMemorizableControlWrapper DiskTimeAvailableScale;
@@ -349,15 +352,16 @@ public:
     }
 
     void CountRequest(ui64 cost) {
-        AtomicSet(BucketCapacity, GetDiskTimeAvailableScale() * BurstThresholdNs.Update(TAppData::TimeProvider->Now()));
-        Bucket.UseAndFill(cost);
-        BurstDetector.Set(!Bucket.IsAvail(), SeqnoBurstDetector.fetch_add(1));
+        i64 bucketCapacity = GetDiskTimeAvailableScale() * BurstThresholdNs.Update(TAppData::TimeProvider->Now());
+        BucketUpperLimit.store(bucketCapacity);
+        BucketLowerLimit.store(bucketCapacity * -BucketRelativeMinimum);
+        Bucket.FillAndTake(cost);
+        BurstDetector.Set(Bucket.IsEmpty(), SeqnoBurstDetector.fetch_add(1));
     }
 
     void SetTimeAvailable(ui64 diskTimeAvailableNSec) {
         ui64 diskTimeAvailable = diskTimeAvailableNSec * GetDiskTimeAvailableScale();
-
-        AtomicSet(DiskTimeAvailable, diskTimeAvailable);
+        DiskTimeAvailable.store(diskTimeAvailable);
         MonGroup->DiskTimeAvailableCtr() = diskTimeAvailable;
     }
 
@@ -408,7 +412,7 @@ public:
     }
 
     void CountPDiskResponse() {
-        BurstDetector.Set(!Bucket.IsAvail(), SeqnoBurstDetector.fetch_add(1));
+        BurstDetector.Set(Bucket.IsEmpty(), SeqnoBurstDetector.fetch_add(1));
     }
 
 private:

+ 1 - 0
ydb/library/lockfree_bucket/lockfree_bucket.cpp

@@ -0,0 +1 @@
+#include "lockfree_bucket.h"

+ 66 - 0
ydb/library/lockfree_bucket/lockfree_bucket.h

@@ -0,0 +1,66 @@
+#pragma once
+
+#include <atomic>
+#include <limits>
+
+#include <util/datetime/base.h>
+
+template<class TTimer> 
+class TLockFreeBucket {
+public:
+    TLockFreeBucket(std::atomic<i64>& maxTokens, std::atomic<i64>& minTokens, std::atomic<ui64>& inflowPerSecond)
+        : MaxTokens(maxTokens)
+        , MinTokens(minTokens)
+        , InflowPerSecond(inflowPerSecond)
+        , Tokens(maxTokens.load())
+    {
+        Y_DEBUG_ABORT_UNLESS(maxTokens > 0);
+        Y_DEBUG_ABORT_UNLESS(minTokens < 0);
+    }
+
+    bool IsEmpty() {
+        FillBucket();
+        return Tokens.load() <= 0;
+    }
+
+    void FillAndTake(i64 tokens) {
+        FillBucket();
+        TakeTokens(tokens);
+    }
+
+private:
+    void FillBucket() {
+        TTime prev;
+        TTime now;
+        for (prev = LastUpdate.load(), now = TTimer::Now(); !LastUpdate.compare_exchange_strong(prev, now); ) {}
+
+        ui64 rawInflow = InflowPerSecond.load() * TTimer::Duration(prev, now);
+        if (rawInflow >= TTimer::Resolution) {
+            Tokens.fetch_add(rawInflow / TTimer::Resolution);
+            for (i64 tokens = Tokens.load(), maxTokens = MaxTokens.load(); tokens > maxTokens; ) {
+                if (Tokens.compare_exchange_strong(tokens, maxTokens)) {
+                    break;
+                }
+            }
+        }
+    }
+
+    void TakeTokens(i64 tokens) {
+        Tokens.fetch_sub(tokens);
+        for (i64 tokens = Tokens.load(), minTokens = MinTokens.load(); tokens < minTokens; ) {
+            if (Tokens.compare_exchange_strong(tokens, minTokens)) {
+                break;
+            }
+        }
+    }
+
+private:
+    using TTime = typename TTimer::TTime;
+
+    std::atomic<i64>& MaxTokens;
+    std::atomic<i64>& MinTokens;
+    std::atomic<ui64>& InflowPerSecond;
+
+    std::atomic<i64> Tokens;
+    std::atomic<TTime> LastUpdate;
+};

+ 128 - 0
ydb/library/lockfree_bucket/ut/main.cpp

@@ -0,0 +1,128 @@
+#include <library/cpp/testing/unittest/registar.h>
+#include <ydb/library/lockfree_bucket/lockfree_bucket.h>
+#include <util/system/guard.h>
+#include <util/system/spinlock.h>
+#include <util/system/types.h>
+
+#include <thread>
+
+struct TTestTimerMs {
+    using TTime = TInstant;
+    static constexpr ui64 Resolution = 1000ull; // milliseconds
+
+    static TTime Now() {
+        return TInstant::Zero() + TDuration::MilliSeconds(Time.load());
+    }
+
+    static ui64 Duration(TTime from, TTime to) {
+        return (to - from).MilliSeconds();
+    }
+
+    static std::atomic<ui64> Time;
+
+    static void Reset() {
+        Time.store(0);
+    }
+
+    static void Advance(TDuration delta) {
+        Time.fetch_add(delta.MilliSeconds());
+    }
+};
+
+std::atomic<ui64> TTestTimerMs::Time = {};
+
+Y_UNIT_TEST_SUITE(TLockFreeBucket) {
+    struct TTestContext {
+        TTestContext() {
+            MaxTokens.store(1'000'000);
+            MinTokens.store(-1'000'000);
+            Inflow.store(1'000'000);
+            TTestTimerMs::Reset();
+        }
+
+        template<class TCallback>
+        void Initialize(TCallback callback, ui32 threadCount) {
+            for (ui32 i = 0; i < threadCount; ++i) {
+                Threads.emplace_back(callback);
+            }
+        }
+
+        ~TTestContext() {
+            JoinAll();
+        }
+
+        void JoinAll() {
+            for (std::thread& t : Threads) {
+                t.join();
+            }
+            Threads.clear();
+        }
+
+        std::atomic<i64> MaxTokens;
+        std::atomic<i64> MinTokens;
+        std::atomic<ui64> Inflow;
+        std::vector<std::thread> Threads;
+    };
+
+    void TestLowerLimit(ui32 threadCount) {
+        TTestContext ctx;
+        TLockFreeBucket<TTestTimerMs> bucket(ctx.MaxTokens, ctx.MinTokens, ctx.Inflow);
+
+        auto worker = [&]() {
+            for (ui32 i = 0; i < 100; ++i) {
+                TTestTimerMs::Advance(TDuration::MilliSeconds(10));
+                bucket.FillAndTake(123'456);
+            }
+        };
+
+        ctx.Initialize(worker, threadCount);
+        ctx.JoinAll();
+
+        TTestTimerMs::Advance(TDuration::Seconds(1));
+        TTestTimerMs::Advance(TDuration::MilliSeconds(1));
+
+        UNIT_ASSERT(!bucket.IsEmpty());
+    }
+
+    void TestUpperLimit(ui32 tokensTaken, bool isEmpty, ui32 threadCount) {
+        TTestContext ctx;
+        TLockFreeBucket<TTestTimerMs> bucket(ctx.MaxTokens, ctx.MinTokens, ctx.Inflow);
+        TTestTimerMs::Advance(TDuration::Seconds(100500));
+
+        auto worker = [&]() {
+            for (ui32 i = 0; i < 100; ++i) {
+                TTestTimerMs::Advance(TDuration::MilliSeconds(10));
+                bucket.FillAndTake(tokensTaken);
+            }
+        };
+
+        ctx.Initialize(worker, threadCount);
+        ctx.JoinAll();
+
+        UNIT_ASSERT_VALUES_EQUAL(bucket.IsEmpty(), isEmpty);
+    }
+
+    Y_UNIT_TEST(LowerLimitSingleThreaded) {
+        TestLowerLimit(1);
+    }
+
+    Y_UNIT_TEST(LowerLimitMultiThreaded) {
+        TestLowerLimit(20);
+    }
+
+    Y_UNIT_TEST(UpperLimitSingleThreaded) {
+        TestUpperLimit(123'456, true, 1);
+    }
+
+    Y_UNIT_TEST(UpperLimitMultiThreaded) {
+        TestUpperLimit(123'456, true, 20);
+    }
+
+    Y_UNIT_TEST(LowIntakeSingleThreaded) {
+        TestUpperLimit(1, false, 1);
+    }
+
+    Y_UNIT_TEST(LowIntakeMultiThreaded) {
+        TestUpperLimit(1, false, 20);
+    }
+}

+ 11 - 0
ydb/library/lockfree_bucket/ut/ya.make

@@ -0,0 +1,11 @@
+UNITTEST()
+
+FORK_SUBTESTS()
+SRCS(
+    main.cpp
+)
+PEERDIR(
+    ydb/library/lockfree_bucket
+)
+
+END()

+ 12 - 0
ydb/library/lockfree_bucket/ya.make

@@ -0,0 +1,12 @@
+LIBRARY()
+
+
+SRCS(
+    lockfree_bucket.cpp
+)
+
+END()
+
+RECURSE_FOR_TESTS(
+    ut
+)

+ 1 - 0
ydb/library/ya.make

@@ -14,6 +14,7 @@ RECURSE(
     grpc
     http_proxy
     keys
+    lockfree_bucket
     logger
     login
     mkql_proto