Browse Source

Intermediate changes

robot-piglet 1 year ago
parent
commit
94a99bda49

+ 1 - 3
yt/yt/core/actions/invoker_pool.h

@@ -2,8 +2,6 @@
 
 #include "public.h"
 
-#include <yt/yt/core/misc/historic_usage_aggregator.h>
-
 namespace NYT {
 
 ////////////////////////////////////////////////////////////////////////////////
@@ -77,7 +75,7 @@ public:
         requires TEnumTraits<E>::IsEnum
     TInvokerStatistics GetInvokerStatistics(E index) const;
 
-    virtual void UpdateActionTimeAggregatorParameters(THistoricUsageAggregationParameters newParameters) = 0;
+    virtual void UpdateActionTimeRelevancyHalflife(TDuration newHalflife) = 0;
 
 protected:
     virtual TInvokerStatistics DoGetInvokerStatistics(int index) const = 0;

+ 13 - 18
yt/yt/core/concurrency/fair_share_invoker_pool.cpp

@@ -1,10 +1,9 @@
 #include "fair_share_invoker_pool.h"
 
-#include "scheduler.h"
-
 #include <yt/yt/core/actions/current_invoker.h>
 #include <yt/yt/core/actions/invoker_detail.h>
 
+#include <yt/yt/core/misc/finally.h>
 #include <yt/yt/core/misc/ring_queue.h>
 
 #include <yt/yt/core/profiling/timing.h>
@@ -129,7 +128,7 @@ public:
         IInvokerPtr underlyingInvoker,
         int invokerCount,
         TFairShareCallbackQueueFactory callbackQueueFactory,
-        THistoricUsageAggregationParameters aggregatorParameters)
+        TDuration actionTimeRelevancyHalflife)
         : UnderlyingInvoker_(std::move(underlyingInvoker))
         , Queue_(callbackQueueFactory(invokerCount))
     {
@@ -137,16 +136,16 @@ public:
         InvokerQueueStates_.reserve(invokerCount);
         for (int index = 0; index < invokerCount; ++index) {
             Invokers_.push_back(New<TInvoker>(UnderlyingInvoker_, index, MakeWeak(this)));
-            InvokerQueueStates_.emplace_back(aggregatorParameters);
+            InvokerQueueStates_.emplace_back(actionTimeRelevancyHalflife);
         }
     }
 
-    void UpdateActionTimeAggregatorParameters(THistoricUsageAggregationParameters newParameters) override
+    void UpdateActionTimeRelevancyHalflife(TDuration newHalflife) override
     {
         auto guard = Guard(InvokerQueueStatesLock_);
 
         for (auto& queueState : InvokerQueueStates_) {
-            queueState.UpdateActionTimeAggregatorParameters(newParameters);
+            queueState.UpdateActionTimeRelevancyHalflife(newHalflife);
         }
     }
 
@@ -200,8 +199,8 @@ private:
     {
     public:
         explicit TInvokerQueueState(
-            const THistoricUsageAggregationParameters& parameters)
-            : AverageTimeAggregator_(parameters)
+            TDuration halflife)
+            : AverageTimeAggregator_(halflife)
         { }
 
         void OnActionEnqueued(TInstant now)
@@ -240,17 +239,13 @@ private:
             };
         }
 
-        void UpdateActionTimeAggregatorParameters(THistoricUsageAggregationParameters newParameters)
+        void UpdateActionTimeRelevancyHalflife(TDuration newHalflife)
         {
-            AverageTimeAggregator_.UpdateParameters(THistoricUsageAggregationParameters{
-                newParameters.Mode,
-                newParameters.EmaAlpha,
-                /*resetOnNewParameters*/ false,
-            });
+            AverageTimeAggregator_.SetHalflife(newHalflife, /*resetOnNewHalflife*/ false);
         }
 
     private:
-        THistoricUsageAggregator AverageTimeAggregator_;
+        TAdjustedExponentialMovingAverage AverageTimeAggregator_;
         TRingQueue<TInstant> ActionEnqueueTimes_;
 
         i64 EnqueuedActionCount_ = 0;
@@ -262,7 +257,7 @@ private:
             //! to account for the case when everything was fine
             //! and then invoker gets stuck for a very long time.
             auto maxWaitTime = GetMaxWaitTimeInQueue(now);
-            auto totalWaitTimeMilliseconds = AverageTimeAggregator_.SimulateUpdate(now, maxWaitTime.MillisecondsFloat());
+            auto totalWaitTimeMilliseconds = AverageTimeAggregator_.EstimateAverageWithNewValue(now, maxWaitTime.MillisecondsFloat());
             return TDuration::MilliSeconds(totalWaitTimeMilliseconds);
         }
 
@@ -387,14 +382,14 @@ TDiagnosableInvokerPoolPtr CreateFairShareInvokerPool(
     IInvokerPtr underlyingInvoker,
     int invokerCount,
     TFairShareCallbackQueueFactory callbackQueueFactory,
-    THistoricUsageAggregationParameters aggregatorParameters)
+    TDuration actionTimeRelevancyHalflife)
 {
     YT_VERIFY(0 < invokerCount && invokerCount < 100);
     return New<TFairShareInvokerPool>(
         std::move(underlyingInvoker),
         invokerCount,
         std::move(callbackQueueFactory),
-        aggregatorParameters);
+        actionTimeRelevancyHalflife);
 }
 
 ////////////////////////////////////////////////////////////////////////////////

+ 2 - 5
yt/yt/core/concurrency/fair_share_invoker_pool.h

@@ -6,6 +6,7 @@
 #include <yt/yt/core/actions/invoker_pool.h>
 #include <yt/yt/core/actions/public.h>
 
+#include <yt/yt/core/misc/adjusted_exponential_moving_average.h>
 #include <yt/yt/core/misc/public.h>
 
 #include <yt/yt/core/profiling/public.h>
@@ -52,11 +53,7 @@ TDiagnosableInvokerPoolPtr CreateFairShareInvokerPool(
     IInvokerPtr underlyingInvoker,
     int invokerCount,
     TFairShareCallbackQueueFactory callbackQueueFactory = CreateFairShareCallbackQueue,
-    THistoricUsageAggregationParameters aggregatorParameters =
-        THistoricUsageAggregationParameters{
-            EHistoricUsageAggregationMode::ExponentialMovingAverage,
-            /*emaAlpha*/ THistoricUsageAggregationParameters::DefaultEmaAlpha,
-        });
+    TDuration actionTimeRelevancyHalflife = TAdjustedExponentialMovingAverage::DefaultHalflife);
 
 ////////////////////////////////////////////////////////////////////////////////
 

+ 159 - 100
yt/yt/core/concurrency/unittests/fair_share_invoker_pool_ut.cpp

@@ -288,89 +288,6 @@ protected:
         ExpectTotalCpuTime(1, TDuration::Zero());
     }
 
-    void DoTestGetTotalWaitEstimate(int invokerCount, std::vector<int> waitingActionCounts)
-    {
-        YT_VERIFY(std::ssize(waitingActionCounts) == invokerCount);
-
-        auto invokerPool = CreateInvokerPool(Queues_[0]->GetInvoker(), invokerCount);
-        invokerPool->UpdateActionTimeAggregatorParameters(THistoricUsageAggregationParameters(
-            EHistoricUsageAggregationMode::ExponentialMovingAverage,
-            /*emaAlpha=*/ 0.5));
-
-        // Test plan:
-        // - Each invoker in the pool will have a blocker action followed by |waitingActionCounts[i]| waiting actions.
-        // - Testing is done in |invokerCount| stages:
-        //   (1) The blocker action of the i-th invoker starts and triggers the |stageStartedEvents[i]|.
-        //   (2) We check current average wait time returned by every invoker.
-        //   (3) We trigger |stageFinishedEvents[i]| to release the blocker action of the i-th invoker.
-
-        std::vector<NThreading::TEvent> stageStartedEvents(invokerCount);
-        std::vector<NThreading::TEvent> stageFinishedEvents(invokerCount);
-        std::vector<TInstant> blockingActionEnqueueTimes;
-        std::vector<TInstant> blockingActionDequeueTimes;
-        std::vector<TFuture<void>> blockingActionFutures;
-        std::vector<std::vector<TInstant>> waitingActionEnqueueTimesPerInvoker(invokerCount);
-        std::vector<std::vector<TFuture<void>>> waitingActionFuturesPerInvoker(invokerCount);
-
-        // Enqueue actions to invokers.
-        for (int invokerIndex = 0; invokerIndex < invokerCount; ++invokerIndex) {
-            blockingActionEnqueueTimes.push_back(NProfiling::GetInstant());
-            blockingActionFutures.emplace_back(
-                BIND([&stageFinishedEvents, &stageStartedEvents, invokerIndex] {
-                    stageStartedEvents[invokerIndex].NotifyOne();
-                    YT_VERIFY(stageFinishedEvents[invokerIndex].Wait(Quantum * 100));
-                })
-                .AsyncVia(invokerPool->GetInvoker(invokerIndex))
-                .Run());
-            Spin(Quantum);
-            // Invoker has zero average wait time.
-            auto waitingActionCount = waitingActionCounts[invokerIndex];
-            auto& waitingActionEnqueueTimes = waitingActionEnqueueTimesPerInvoker[invokerIndex];
-            auto& waitingActionFutures = waitingActionFuturesPerInvoker[invokerIndex];
-
-            for (int i = 0; i < waitingActionCount; ++i) {
-                waitingActionEnqueueTimes.push_back(NProfiling::GetInstant());
-                waitingActionFutures.emplace_back(BIND([] {}).AsyncVia(invokerPool->GetInvoker(invokerIndex)).Run());
-
-                Spin(Quantum);
-            }
-        }
-
-        // Test average wait time.
-        for (int stage = 0; stage < invokerCount; ++stage) {
-            YT_VERIFY(stageStartedEvents[stage].Wait(Quantum * 100));
-
-            // Collect average wait times.
-            std::vector<TDuration> averageWaitTimes(invokerCount);
-            for (int invokerIndex = 0; invokerIndex < invokerCount; ++invokerIndex) {
-                averageWaitTimes[invokerIndex] = invokerPool->GetInvokerStatistics(invokerIndex).TotalTimeEstimate;
-            }
-            // Release invoker.
-            stageFinishedEvents[stage].NotifyOne();
-            blockingActionDequeueTimes.push_back(NProfiling::GetInstant());
-
-            for (int invokerIndex = 0; invokerIndex < invokerCount; ++invokerIndex) {
-                auto actualWaitTime = averageWaitTimes[invokerIndex];
-
-                bool stillWaiting = invokerIndex > stage;
-                auto expectedWaitTime =
-                    blockingActionDequeueTimes[stillWaiting ? stage : invokerIndex] -
-                    blockingActionEnqueueTimes[invokerIndex];
-
-                EXPECT_GE(actualWaitTime, expectedWaitTime - Margin)
-                    << TError("Stage: %v, Invoker: %v", stage, invokerIndex).GetMessage();
-                EXPECT_LE(actualWaitTime, expectedWaitTime + Margin)
-                    << TError("Stage: %v, Invoker: %v", stage, invokerIndex).GetMessage();
-            }
-        }
-
-        // Wait for all actions to finish.
-        WaitFor(AllSet(blockingActionFutures)).ThrowOnError();
-        for (int invokerIndex = 0; invokerIndex < invokerCount; ++invokerIndex) {
-            WaitFor(AllSet(waitingActionFuturesPerInvoker[invokerIndex])).ThrowOnError();
-        }
-    }
-
     static void Spin(TDuration duration)
     {
         NProfiling::TFiberWallTimer timer;
@@ -521,7 +438,7 @@ TEST_F(TFairShareInvokerPoolTest, CpuTimeAccountingBetweenContextSwitchesIsNotSu
     future.Get().ThrowOnError();
 }
 
-TEST_F(TFairShareInvokerPoolTest, GetTotalWaitEstimateIsZeroForEmptyPool)
+TEST_F(TFairShareInvokerPoolTest, GetTotalWaitTimeEstimateEmptyPool)
 {
     auto invokerPool = CreateInvokerPool(Queues_[0]->GetInvoker(), 1);
 
@@ -534,32 +451,174 @@ TEST_F(TFairShareInvokerPoolTest, GetTotalWaitEstimateIsZeroForEmptyPool)
     EXPECT_LE(invokerPool->GetInvokerStatistics(0).TotalTimeEstimate, Quantum + Margin);
 }
 
-TEST_F(TFairShareInvokerPoolTest, GetTotalWaitEstimateOneBucketOneWaitingAction)
+TEST_F(TFairShareInvokerPoolTest, GetTotalWaitTimeEstimateStuckAction)
 {
-    DoTestGetTotalWaitEstimate(
-        /* invokerCount */ 1,
-        /* waitingActionCounts */ {1});
+    auto invokerPool = CreateInvokerPool(Queues_[0]->GetInvoker(), 1);
+    NThreading::TEvent event;
+
+    auto action = BIND([&event]{
+        event.Wait(TDuration::Seconds(100));
+    })
+    .AsyncVia(invokerPool->GetInvoker(0))
+    .Run();
+
+    TDelayedExecutor::WaitForDuration(Quantum);
+
+    auto totalTimeEstimate = invokerPool->GetInvokerStatistics(0).TotalTimeEstimate;
+
+    EXPECT_LE(totalTimeEstimate, Quantum + Margin);
+    EXPECT_GE(totalTimeEstimate, Quantum - Margin);
+
+    event.NotifyAll();
+    WaitFor(std::move(action)).ThrowOnError();
 }
 
-TEST_F(TFairShareInvokerPoolTest, GetTotalWaitEstimateOneBucketTenWaitingActions)
+TEST_F(TFairShareInvokerPoolTest, GetTotalWaitTimeEstimateRelevancyDecay)
 {
-    DoTestGetTotalWaitEstimate(
-        /* invokerCount */ 1,
-        /* waitingActionCounts */ {10});
+    auto invokerPool = CreateInvokerPool(Queues_[0]->GetInvoker(), 1);
+    // Make aggregator very forgetful.
+    invokerPool->UpdateActionTimeRelevancyHalflife(TDuration::Zero());
+    NThreading::TEvent event;
+
+    auto action = BIND([&event]{
+        event.Wait(100 * Quantum);
+    })
+    .AsyncVia(invokerPool->GetInvoker(0))
+    .Run();
+
+    TDelayedExecutor::WaitForDuration(Quantum);
+
+    auto totalTimeEstimate = invokerPool->GetInvokerStatistics(0).TotalTimeEstimate;
+
+    EXPECT_LE(totalTimeEstimate, Quantum + Margin);
+    EXPECT_GE(totalTimeEstimate, Quantum - Margin);
+
+    event.NotifyAll();
+    WaitFor(std::move(action)).ThrowOnError();
+
+    TDelayedExecutor::WaitForDuration(Quantum);
+
+    EXPECT_LE(invokerPool->GetInvokerStatistics(0).TotalTimeEstimate, Margin);
 }
 
-TEST_F(TFairShareInvokerPoolTest, GetTotalWaitEstimateTwoBuckets)
+TEST_F(TFairShareInvokerPoolTest, GetTotalWaitTimeEstimateSeveralActions)
 {
-    DoTestGetTotalWaitEstimate(
-        /* invokerCount */ 2,
-        /* waitingActionCounts */ {4, 8});
+    static constexpr int ActionCount = 3;
+
+    auto invokerPool = CreateInvokerPool(Queues_[0]->GetInvoker(), 1);
+    // Make aggregator never forget a sample.
+    invokerPool->UpdateActionTimeRelevancyHalflife(TDuration::Days(100000000000000000));
+
+    std::vector<NThreading::TEvent> leashes(ActionCount);
+    std::vector<TFuture<void>> actions;
+
+    for (int idx = 0; idx < ActionCount; ++idx) {
+        actions.emplace_back(BIND([&leashes, idx] {
+            leashes[idx].Wait(100 * Quantum);
+        })
+        .AsyncVia(invokerPool->GetInvoker(0))
+        .Run());
+    }
+
+    auto expectedTotalTimeEstimate = TDuration::Zero();
+    auto start = GetInstant();
+
+    for (int idx = 0; idx < ActionCount; ++idx) {
+        TDelayedExecutor::WaitForDuration(Quantum);
+
+        auto statistics = invokerPool->GetInvokerStatistics(0);
+        auto expectedTotalTime = GetInstant() - start;
+
+        if (idx == 0) {
+            expectedTotalTimeEstimate = expectedTotalTime;
+        } else {
+            expectedTotalTimeEstimate = (expectedTotalTimeEstimate * idx + expectedTotalTime) / (idx + 1.0);
+        }
+
+        EXPECT_EQ(statistics.WaitingActionCount, ActionCount - idx);
+
+        EXPECT_LE(statistics.TotalTimeEstimate, expectedTotalTimeEstimate + Margin)
+            << TError("Index: %v.", idx).GetMessage();
+        EXPECT_GE(statistics.TotalTimeEstimate, expectedTotalTimeEstimate - Margin)
+            << TError("Index: %v.", idx).GetMessage();
+
+        leashes[idx].NotifyOne();
+        WaitFor(std::move(actions[idx])).ThrowOnError();
+    }
 }
 
-TEST_F(TFairShareInvokerPoolTest, GetTotalWaitEstimateThreeBuckets)
+TEST_F(TFairShareInvokerPoolTest, GetTotalWaitEstimateUncorrelatedWithOtherInvokers)
 {
-    DoTestGetTotalWaitEstimate(
-        /* invokerCount */ 3,
-        /* waitingActionCounts */ {1, 2, 3});
+    auto executionOrderEnforcer = [] (int suggestedStep) {
+        static int realStep = 0;
+        EXPECT_EQ(realStep, suggestedStep);
+        ++realStep;
+    };
+    auto invokerPool = CreateInvokerPool(Queues_[0]->GetInvoker(), 2);
+    // Make aggregator never forget a sample.
+    invokerPool->UpdateActionTimeRelevancyHalflife(TDuration::Days(100000000000000000));
+
+    std::vector<NThreading::TEvent> leashes(2);
+    std::vector<TFuture<void>> actions;
+
+    for (int idx = 0; idx < 2; ++idx) {
+        actions.emplace_back(BIND([&executionOrderEnforcer, &leashes, idx] {
+            if (idx == 0) {
+                executionOrderEnforcer(0);
+            } else {
+                executionOrderEnforcer(2);
+            }
+            leashes[idx].Wait(100 * Quantum);
+        })
+        .AsyncVia(invokerPool->GetInvoker(0))
+        .Run());
+    }
+
+    NThreading::TEvent secondaryLeash;
+    auto secondaryAction = BIND([&executionOrderEnforcer, &secondaryLeash] {
+        executionOrderEnforcer(1);
+        secondaryLeash.Wait(100 * Quantum);
+    }).AsyncVia(invokerPool->GetInvoker(1)).Run();
+
+    auto start = GetInstant();
+
+    TDelayedExecutor::WaitForDuration(Quantum);
+
+    auto statistics = invokerPool->GetInvokerStatistics(0);
+    auto expectedTotalTimeEstimate = GetInstant() - start;
+
+    EXPECT_EQ(statistics.WaitingActionCount, 2);
+    EXPECT_LE(statistics.TotalTimeEstimate, expectedTotalTimeEstimate + Margin);
+    EXPECT_GE(statistics.TotalTimeEstimate, expectedTotalTimeEstimate - Margin);
+
+    leashes[0].NotifyOne();
+    WaitFor(std::move(actions[0])).ThrowOnError();
+
+    // Second action will not be executed until the secondary action is released.
+
+    leashes[1].NotifyOne();
+    TDelayedExecutor::WaitForDuration(10 * Quantum);
+    EXPECT_FALSE(actions[1].IsSet());
+
+    // Release Secondary action.
+
+    auto secondaryStatistics = invokerPool->GetInvokerStatistics(1);
+    auto secondaryWaitTime = GetInstant() - start;
+
+    EXPECT_EQ(secondaryStatistics.WaitingActionCount, 1);
+    EXPECT_LE(secondaryStatistics.TotalTimeEstimate, secondaryWaitTime + Margin);
+    EXPECT_GE(secondaryStatistics.TotalTimeEstimate, secondaryWaitTime - Margin);
+
+    secondaryLeash.NotifyOne();
+    WaitFor(std::move(secondaryAction)).ThrowOnError();
+    WaitFor(std::move(actions[1])).ThrowOnError();
+
+    statistics = invokerPool->GetInvokerStatistics(0);
+    expectedTotalTimeEstimate = (expectedTotalTimeEstimate + (GetInstant() - start)) / 3.0;
+
+    EXPECT_EQ(statistics.WaitingActionCount, 0);
+    EXPECT_LE(statistics.TotalTimeEstimate, expectedTotalTimeEstimate + Margin);
+    EXPECT_GE(statistics.TotalTimeEstimate, expectedTotalTimeEstimate - Margin);
 }
 
 ////////////////////////////////////////////////////////////////////////////////

+ 133 - 0
yt/yt/core/misc/adjusted_exponential_moving_average.cpp

@@ -0,0 +1,133 @@
+#include "adjusted_exponential_moving_average.h"
+
+#include <yt/yt/core/profiling/timing.h>
+
+#include <library/cpp/yt/assert/assert.h>
+
+#include <util/generic/ymath.h>
+
+namespace NYT {
+
+////////////////////////////////////////////////////////////////////////////////
+
+TAdjustedExponentialMovingAverage::TAdjustedExponentialMovingAverage(TDuration halflife)
+    : Halflife_(halflife)
+{ }
+
+double TAdjustedExponentialMovingAverage::GetAverage() const
+{
+    return TotalNumerator_ / TotalDenominator_;
+}
+
+void TAdjustedExponentialMovingAverage::UpdateAt(TInstant now, double value)
+{
+    if (now < LastUpdateTime_) {
+        return;
+    }
+
+    std::tie(TotalDenominator_, TotalNumerator_) =
+        ApplyUpdate(TotalDenominator_, TotalNumerator_, now, value);
+    LastUpdateTime_ = now;
+}
+
+//! Simulates UpdateAt(now, value) + GetAverage() without changing the state.
+double TAdjustedExponentialMovingAverage::EstimateAverageWithNewValue(TInstant now, double value) const
+{
+    if (now < LastUpdateTime_) {
+        return GetAverage();
+    }
+
+    auto [denominator, numerator] = ApplyUpdate(TotalDenominator_, TotalNumerator_, now, value);
+    return numerator / denominator;
+}
+
+void TAdjustedExponentialMovingAverage::SetHalflife(TDuration halflife, bool resetOnNewHalflife)
+{
+    if (Halflife_ == halflife) {
+        return;
+    }
+
+    Halflife_ = halflife;
+
+    if (resetOnNewHalflife) {
+        Reset();
+    }
+}
+
+void TAdjustedExponentialMovingAverage::Reset()
+{
+    LastUpdateTime_ = TInstant::Zero();
+    TotalDenominator_ = 1.0;
+    TotalNumerator_ = 0.0;
+}
+
+std::pair<double, double> TAdjustedExponentialMovingAverage::ApplyUpdate(
+    double denominator,
+    double numerator,
+    TInstant now,
+    double value) const
+{
+    if (LastUpdateTime_ == TInstant::Zero() || Halflife_ == TDuration::Zero()) {
+        return std::pair(1.0, value);
+    }
+
+    double secondsPassed = (now - LastUpdateTime_).SecondsFloat();
+    double multiplier = Exp2(-secondsPassed / Halflife_.SecondsFloat());
+
+    denominator = 1.0 + denominator * multiplier;
+    numerator = value + numerator * multiplier;
+
+    return std::pair(denominator, numerator);
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
+TAverageAdjustedExponentialMovingAverage::TAverageAdjustedExponentialMovingAverage(
+    TDuration halflife,
+    TDuration period)
+    : Period_(period)
+    , Impl_(halflife)
+{ }
+
+void TAverageAdjustedExponentialMovingAverage::SetHalflife(TDuration halflife, bool resetOnNewHalflife)
+{
+    Impl_.SetHalflife(halflife, resetOnNewHalflife);
+}
+
+double TAverageAdjustedExponentialMovingAverage::GetAverage()
+{
+    auto now = NProfiling::GetInstant();
+    MaybeFlush(now);
+    return Impl_.GetAverage();
+}
+
+void TAverageAdjustedExponentialMovingAverage::UpdateAt(TInstant now, double value)
+{
+    MaybeFlush(now);
+    CurrentUsage_ += value;
+}
+
+void TAverageAdjustedExponentialMovingAverage::MaybeFlush(TInstant now)
+{
+    if (!IntervalStart_ || now < IntervalStart_) {
+        IntervalStart_ = now;
+        return;
+    }
+
+    auto diff = now - IntervalStart_;
+    if (diff < Period_) {
+        return;
+    }
+
+    auto ratio = diff / Period_;
+    auto usagePerPeriod = CurrentUsage_ / ratio;
+
+    Impl_.UpdateAt(now, usagePerPeriod);
+
+    IntervalStart_ = now;
+    CurrentUsage_ = 0;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
+} // namespace NYT

+ 75 - 0
yt/yt/core/misc/adjusted_exponential_moving_average.h

@@ -0,0 +1,75 @@
+#pragma once
+
+#include <util/datetime/base.h>
+
+namespace NYT {
+
+////////////////////////////////////////////////////////////////////////////////
+
+class TAdjustedExponentialMovingAverage
+{
+public:
+    static constexpr auto DefaultHalflife = TDuration::Seconds(10);
+
+    explicit TAdjustedExponentialMovingAverage(TDuration halflife = DefaultHalflife);
+
+    double GetAverage() const;
+
+    void UpdateAt(TInstant now, double value);
+
+    //! Simulates UpdateAt(now, value) + GetAverage() without changing the state.
+    double EstimateAverageWithNewValue(TInstant now, double value) const;
+
+    void SetHalflife(TDuration halflife, bool resetOnNewHalflife = true);
+
+    void Reset();
+
+private:
+    //! Parameter of adjusted exponential moving average.
+    //! It means that weight of the data, recorded Halflife_ seconds ago
+    //! is roughly half of what it was initially.
+    //! Adjusted EMA formula described here: https://clck.ru/37bd2i
+    //! under "adjusted = True" with (1 - alpha) = 2^(-1 / Halflife_).
+    TDuration Halflife_;
+
+    TInstant LastUpdateTime_ = TInstant::Zero();
+
+    double TotalDenominator_ = 1.0;
+    double TotalNumerator_ = 0.0;
+
+    std::pair<double, double> ApplyUpdate(
+        double denominator,
+        double numerator,
+        TInstant now,
+        double value) const;
+};
+
+////////////////////////////////////////////////////////////////////////////////
+
+class TAverageAdjustedExponentialMovingAverage
+{
+public:
+    explicit TAverageAdjustedExponentialMovingAverage(
+        TDuration halflife = TAdjustedExponentialMovingAverage::DefaultHalflife,
+        TDuration period = TDuration::Seconds(1));
+
+    void SetHalflife(TDuration halflife, bool resetOnNewHalflife = true);
+
+    double GetAverage();
+
+    void UpdateAt(TInstant now, double value);
+
+private:
+    TDuration Period_;
+
+    TInstant IntervalStart_ = TInstant::Zero();
+    double CurrentUsage_ = 0;
+
+    TAdjustedExponentialMovingAverage Impl_;
+
+    void MaybeFlush(TInstant now);
+};
+
+////////////////////////////////////////////////////////////////////////////////
+
+} // namespace NYT

+ 0 - 16
yt/yt/core/misc/config.cpp

@@ -96,22 +96,6 @@ void THistogramDigestConfig::Register(TRegistrar registrar)
 
 ////////////////////////////////////////////////////////////////////////////////
 
-void THistoricUsageConfig::Register(TRegistrar registrar)
-{
-    registrar.Parameter("aggregation_mode", &TThis::AggregationMode)
-        .Default(EHistoricUsageAggregationMode::None);
-
-    registrar.Parameter("ema_alpha", &TThis::EmaAlpha)
-        // TODO(eshcherbin): Adjust.
-        .Default(1.0 / (24.0 * 60.0 * 60.0))
-        .GreaterThanOrEqual(0.0);
-
-    registrar.Parameter("reset_on_new_parameters", &TThis::ResetOnNewParameters)
-        .Default(true);
-}
-
-////////////////////////////////////////////////////////////////////////////////
-
 void TAdaptiveHedgingManagerConfig::Register(TRegistrar registrar)
 {
     registrar.Parameter("max_backup_request_ratio", &TThis::MaxBackupRequestRatio)

+ 2 - 31
yt/yt/core/misc/config.h

@@ -42,6 +42,8 @@ struct TConstantBackoffOptions
 
 ////////////////////////////////////////////////////////////////////////////////
 
+//! TODO(arkady-e1ppa): Make configs below pairs of POD-structs and TExternalizedYsonStruct.
+
 class TLogDigestConfig
     : public NYTree::TYsonStruct
 {
@@ -92,35 +94,6 @@ DEFINE_REFCOUNTED_TYPE(THistogramDigestConfig)
 
 ////////////////////////////////////////////////////////////////////////////////
 
-DEFINE_ENUM(EHistoricUsageAggregationMode,
-    ((None)                     (0))
-    ((ExponentialMovingAverage) (1))
-);
-
-// TODO(arkady-e1ppa): Use YsonExternalSerializer from pr5052145 once it's ready.
-class THistoricUsageConfig
-    : public NYTree::TYsonStruct
-{
-public:
-    EHistoricUsageAggregationMode AggregationMode;
-
-    //! Parameter of exponential moving average (EMA) of the aggregated usage.
-    //! Roughly speaking, it means that current usage ratio is twice as relevant for the
-    //! historic usage as the usage ratio alpha seconds ago.
-    //! EMA for unevenly spaced time series was adapted from here: https://clck.ru/HaGZs
-    double EmaAlpha;
-
-    bool ResetOnNewParameters;
-
-    REGISTER_YSON_STRUCT(THistoricUsageConfig);
-
-    static void Register(TRegistrar registrar);
-};
-
-DEFINE_REFCOUNTED_TYPE(THistoricUsageConfig)
-
-////////////////////////////////////////////////////////////////////////////////
-
 class TAdaptiveHedgingManagerConfig
     : public virtual NYTree::TYsonStruct
 {
@@ -169,8 +142,6 @@ public:
     static void Register(TRegistrar registrar);
 };
 
-
-
 } // namespace NDetail
 
 ////////////////////////////////////////////////////////////////////////////////

+ 0 - 155
yt/yt/core/misc/historic_usage_aggregator.cpp

@@ -1,155 +0,0 @@
-#include "historic_usage_aggregator.h"
-
-#include <yt/yt/core/profiling/timing.h>
-
-#include <library/cpp/yt/assert/assert.h>
-
-#include <util/generic/ymath.h>
-
-namespace NYT {
-
-////////////////////////////////////////////////////////////////////////////////
-
-THistoricUsageAggregationParameters::THistoricUsageAggregationParameters(
-    EHistoricUsageAggregationMode mode,
-    double emaAlpha,
-    bool resetOnNewParameters)
-    : Mode(mode)
-    , EmaAlpha(emaAlpha)
-    , ResetOnNewParameters(resetOnNewParameters)
-{ }
-
-THistoricUsageAggregationParameters::THistoricUsageAggregationParameters(
-    const THistoricUsageConfigPtr& config)
-    : Mode(config->AggregationMode)
-    , EmaAlpha(config->EmaAlpha)
-    , ResetOnNewParameters(config->ResetOnNewParameters)
-{ }
-
-////////////////////////////////////////////////////////////////////////////////
-
-THistoricUsageAggregator::THistoricUsageAggregator()
-{
-    Reset();
-}
-
-THistoricUsageAggregator::THistoricUsageAggregator(
-    const THistoricUsageAggregationParameters& parameters)
-    : Parameters_(parameters)
-{
-    Reset();
-}
-
-void THistoricUsageAggregator::UpdateParameters(
-    const THistoricUsageAggregationParameters& newParameters)
-{
-    if (Parameters_ == newParameters) {
-        return;
-    }
-
-    Parameters_ = newParameters;
-    if (Parameters_.ResetOnNewParameters) {
-        Reset();
-    }
-}
-
-void THistoricUsageAggregator::Reset()
-{
-    ExponentialMovingAverage_ = 0.0;
-    LastExponentialMovingAverageUpdateTime_ = TInstant::Zero();
-}
-
-void THistoricUsageAggregator::UpdateAt(TInstant now, double value)
-{
-    if (now < LastExponentialMovingAverageUpdateTime_) {
-        return;
-    }
-
-    ExponentialMovingAverage_ = ApplyUpdate(ExponentialMovingAverage_, now, value);
-    LastExponentialMovingAverageUpdateTime_ = now;
-}
-
-double THistoricUsageAggregator::SimulateUpdate(TInstant now, double value) const
-{
-    auto simulatedValue = GetHistoricUsage();
-
-    if (now < LastExponentialMovingAverageUpdateTime_) {
-        return simulatedValue;
-    }
-
-    return ApplyUpdate(simulatedValue, now, value);
-}
-
-double THistoricUsageAggregator::GetHistoricUsage() const
-{
-    return ExponentialMovingAverage_;
-}
-
-bool THistoricUsageAggregator::ShouldFlush() const
-{
-    return
-        Parameters_.Mode == EHistoricUsageAggregationMode::None ||
-        Parameters_.EmaAlpha == 0.0 ||
-        LastExponentialMovingAverageUpdateTime_ == TInstant::Zero();
-}
-
-double THistoricUsageAggregator::ApplyUpdate(double current, TInstant now, double value) const
-{
-    if (ShouldFlush()) {
-        current = value;
-    } else {
-        auto sinceLast = now - LastExponentialMovingAverageUpdateTime_;
-        auto w = Exp2(-1. * Parameters_.EmaAlpha * sinceLast.SecondsFloat());
-        current = w * ExponentialMovingAverage_ + (1 - w) * value;
-    }
-    return current;
-}
-
-////////////////////////////////////////////////////////////////////////////////
-
-TAverageHistoricUsageAggregator::TAverageHistoricUsageAggregator(TDuration period)
-    : Period_(period)
-{ }
-
-void TAverageHistoricUsageAggregator::UpdateParameters(THistoricUsageAggregationParameters params)
-{
-    HistoricUsageAggregator_.UpdateParameters(params);
-}
-
-double TAverageHistoricUsageAggregator::GetHistoricUsage()
-{
-    auto now = NProfiling::GetInstant();
-    MaybeFlush(now);
-    return HistoricUsageAggregator_.GetHistoricUsage();
-}
-
-void TAverageHistoricUsageAggregator::UpdateAt(TInstant now, double value)
-{
-    MaybeFlush(now);
-    CurrentUsage_ += value;
-}
-
-void TAverageHistoricUsageAggregator::MaybeFlush(TInstant now)
-{
-    if (!IntervalStart_ || now < IntervalStart_) {
-        IntervalStart_ = now;
-        return;
-    }
-
-    auto diff = now - IntervalStart_;
-    if (diff < Period_) {
-        return;
-    }
-
-    auto ratio = diff / Period_;
-    auto usagePerPeriod = CurrentUsage_ / ratio;
-
-    HistoricUsageAggregator_.UpdateAt(now, usagePerPeriod);
-
-    IntervalStart_ = now;
-    CurrentUsage_ = 0;
-}
-
-////////////////////////////////////////////////////////////////////////////////
-
-} // namespace NYT

+ 0 - 97
yt/yt/core/misc/historic_usage_aggregator.h

@@ -1,97 +0,0 @@
-#pragma once
-
-#include "config.h"
-
-#include <util/datetime/base.h>
-
-namespace NYT {
-
-////////////////////////////////////////////////////////////////////////////////
-
-struct THistoricUsageAggregationParameters
-{
-    THistoricUsageAggregationParameters() = default;
-    explicit THistoricUsageAggregationParameters(
-        EHistoricUsageAggregationMode mode,
-        double emaAlpha = 0.0,
-        bool resetOnNewParameters = true);
-    explicit THistoricUsageAggregationParameters(const THistoricUsageConfigPtr& config);
-
-    bool operator==(const THistoricUsageAggregationParameters& other) const = default;
-
-    static constexpr double DefaultEmaAlpha = 0.1;
-
-    EHistoricUsageAggregationMode Mode = EHistoricUsageAggregationMode::None;
-
-    //! Parameter of exponential moving average (EMA) of the aggregated usage.
-    //! Roughly speaking, it means that current usage ratio is twice as relevant for the
-    //! historic usage as the usage ratio alpha seconds ago.
-    //! EMA for unevenly spaced time series was adapted from here: https://clck.ru/HaGZs
-    double EmaAlpha = 0.0;
-
-    bool ResetOnNewParameters = true;
-};
-
-////////////////////////////////////////////////////////////////////////////////
-
-class THistoricUsageAggregator
-{
-public:
-    THistoricUsageAggregator();
-    explicit THistoricUsageAggregator(const THistoricUsageAggregationParameters& parameters);
-    THistoricUsageAggregator(const THistoricUsageAggregator& other) = default;
-    THistoricUsageAggregator& operator=(const THistoricUsageAggregator& other) = default;
-
-    //! Update the parameters. If the parameters have changed, resets the state.
-    void UpdateParameters(const THistoricUsageAggregationParameters& newParameters);
-
-    void Reset();
-
-    void UpdateAt(TInstant now, double value);
-
-    double GetHistoricUsage() const;
-
-    //! Simulates combination of UpdateAt(now, value) + GetHistoricUsage without changing the state
-    double SimulateUpdate(TInstant now, double value) const;
-
-private:
-    THistoricUsageAggregationParameters Parameters_;
-
-    double ExponentialMovingAverage_;
-
-    TInstant LastExponentialMovingAverageUpdateTime_;
-
-    bool ShouldFlush() const;
-
-    double ApplyUpdate(double current, TInstant now, double value) const;
-};
-
-////////////////////////////////////////////////////////////////////////////////
-
-class TAverageHistoricUsageAggregator
-{
-public:
-    explicit TAverageHistoricUsageAggregator(TDuration period = TDuration::Seconds(1));
-    TAverageHistoricUsageAggregator(const TAverageHistoricUsageAggregator& other) = default;
-    TAverageHistoricUsageAggregator& operator=(const TAverageHistoricUsageAggregator& other) = default;
-
-    void UpdateParameters(THistoricUsageAggregationParameters params);
-
-    double GetHistoricUsage();
-
-    void UpdateAt(TInstant now, double value);
-
-private:
-    TDuration Period_;
-
-    TInstant IntervalStart_ = TInstant::Zero();
-    double CurrentUsage_ = 0;
-
-    THistoricUsageAggregator HistoricUsageAggregator_;
-
-    void MaybeFlush(TInstant now);
-};
-
-////////////////////////////////////////////////////////////////////////////////
-
-} // namespace NYT

Some files were not shown because too many files changed in this diff