Browse Source

YQ-842 Move IRetryPolicy from PQ SDK to library

ref:6f3f663c57e22b6d97bfc34fb795be48ad8e6ec0
Vasily Gerasimov 3 years ago
parent
commit
91baa58196

+ 289 - 0
library/cpp/retry/retry_policy.h

@@ -0,0 +1,289 @@
+#pragma once
+#include <util/datetime/base.h>
+#include <util/generic/maybe.h>
+#include <util/generic/typetraits.h>
+#include <util/random/random.h>
+
+#include <functional>
+#include <limits>
+#include <memory>
+
+//! Retry policy.
+//! Calculates delay before next retry (if any).
+//! Has several default implementations:
+//! - exponential backoff policy;
+//! - retries with fixed interval;
+//! - no retries.
+
+enum class ERetryErrorClass {
+    // This error shouldn't be retried.
+    NoRetry,
+
+    // This error could be retried in short period of time.
+    ShortRetry,
+
+    // This error requires waiting before it could be retried.
+    LongRetry,
+};
+
+template <class... TArgs>
+struct IRetryPolicy {
+    using TPtr = std::shared_ptr<IRetryPolicy>;
+
+    using TRetryClassFunction = std::function<ERetryErrorClass(typename TTypeTraits<TArgs>::TFuncParam...)>;
+
+    //! Retry state of single request.
+    struct IRetryState {
+        using TPtr = std::unique_ptr<IRetryState>;
+
+        virtual ~IRetryState() = default;
+
+        //! Calculate delay before next retry if next retry is allowed.
+        //! Returns empty maybe if retry is not allowed anymore.
+        virtual TMaybe<TDuration> GetNextRetryDelay(typename TTypeTraits<TArgs>::TFuncParam... args) = 0;
+    };
+
+    virtual ~IRetryPolicy() = default;
+
+    //! Function that is called after first error
+    //! to find out a futher retry behaviour.
+    //! Retry state is expected to be created for the whole single retry session.
+    virtual typename IRetryState::TPtr CreateRetryState() const = 0;
+
+    //!
+    //! Default implementations.
+    //!
+
+    static TPtr GetNoRetryPolicy(); // Denies all kind of retries.
+
+    //! Randomized exponential backoff policy.
+    static TPtr GetExponentialBackoffPolicy(TRetryClassFunction retryClassFunction,
+                                            TDuration minDelay = TDuration::MilliSeconds(10),
+                                            // Delay for statuses that require waiting before retry (such as OVERLOADED).
+                                            TDuration minLongRetryDelay = TDuration::MilliSeconds(200),
+                                            TDuration maxDelay = TDuration::Seconds(30),
+                                            size_t maxRetries = std::numeric_limits<size_t>::max(),
+                                            TDuration maxTime = TDuration::Max(),
+                                            double scaleFactor = 2.0);
+
+    //! Randomized fixed interval policy.
+    static TPtr GetFixedIntervalPolicy(TRetryClassFunction retryClassFunction,
+                                       TDuration delay = TDuration::MilliSeconds(100),
+                                       // Delay for statuses that require waiting before retry (such as OVERLOADED).
+                                       TDuration longRetryDelay = TDuration::MilliSeconds(300),
+                                       size_t maxRetries = std::numeric_limits<size_t>::max(),
+                                       TDuration maxTime = TDuration::Max());
+};
+
+template <class... TArgs>
+struct TNoRetryPolicy : IRetryPolicy<TArgs...> {
+    using IRetryState = typename IRetryPolicy<TArgs...>::IRetryState;
+
+    struct TNoRetryState : IRetryState {
+        TMaybe<TDuration> GetNextRetryDelay(typename TTypeTraits<TArgs>::TFuncParam...) override {
+            return Nothing();
+        }
+    };
+
+    typename IRetryState::TPtr CreateRetryState() const override {
+        return std::make_unique<TNoRetryState>();
+    }
+};
+
+namespace NRetryDetails {
+inline TDuration RandomizeDelay(TDuration baseDelay) {
+    const TDuration::TValue half = baseDelay.GetValue() / 2;
+    return TDuration::FromValue(half + RandomNumber<TDuration::TValue>(half));
+}
+} // namespace NRetryDetails
+
+template <class... TArgs>
+struct TExponentialBackoffPolicy : IRetryPolicy<TArgs...> {
+    using IRetryPolicy = IRetryPolicy<TArgs...>;
+    using IRetryState = typename IRetryPolicy::IRetryState;
+
+    struct TExponentialBackoffState : IRetryState {
+        TExponentialBackoffState(typename IRetryPolicy::TRetryClassFunction retryClassFunction,
+                                 TDuration minDelay,
+                                 TDuration minLongRetryDelay,
+                                 TDuration maxDelay,
+                                 size_t maxRetries,
+                                 TDuration maxTime,
+                                 double scaleFactor)
+            : MinLongRetryDelay(minLongRetryDelay)
+            , MaxDelay(maxDelay)
+            , MaxRetries(maxRetries)
+            , MaxTime(maxTime)
+            , ScaleFactor(scaleFactor)
+            , StartTime(maxTime != TDuration::Max() ? TInstant::Now() : TInstant::Zero())
+            , CurrentDelay(minDelay)
+            , AttemptsDone(0)
+            , RetryClassFunction(std::move(retryClassFunction))
+        {
+        }
+
+        TMaybe<TDuration> GetNextRetryDelay(typename TTypeTraits<TArgs>::TFuncParam... args) override {
+            const ERetryErrorClass errorClass = RetryClassFunction(args...);
+            if (errorClass == ERetryErrorClass::NoRetry || AttemptsDone >= MaxRetries || StartTime && TInstant::Now() - StartTime >= MaxTime) {
+                return Nothing();
+            }
+
+            if (errorClass == ERetryErrorClass::LongRetry) {
+                CurrentDelay = Max(CurrentDelay, MinLongRetryDelay);
+            }
+
+            const TDuration delay = NRetryDetails::RandomizeDelay(CurrentDelay);
+
+            if (CurrentDelay < MaxDelay) {
+                CurrentDelay = Min(CurrentDelay * ScaleFactor, MaxDelay);
+            }
+
+            ++AttemptsDone;
+            return delay;
+        }
+
+        const TDuration MinLongRetryDelay;
+        const TDuration MaxDelay;
+        const size_t MaxRetries;
+        const TDuration MaxTime;
+        const double ScaleFactor;
+        const TInstant StartTime;
+        TDuration CurrentDelay;
+        size_t AttemptsDone;
+        typename IRetryPolicy::TRetryClassFunction RetryClassFunction;
+    };
+
+    TExponentialBackoffPolicy(typename IRetryPolicy::TRetryClassFunction retryClassFunction,
+                              TDuration minDelay,
+                              TDuration minLongRetryDelay,
+                              TDuration maxDelay,
+                              size_t maxRetries,
+                              TDuration maxTime,
+                              double scaleFactor)
+        : MinDelay(minDelay)
+        , MinLongRetryDelay(minLongRetryDelay)
+        , MaxDelay(maxDelay)
+        , MaxRetries(maxRetries)
+        , MaxTime(maxTime)
+        , ScaleFactor(scaleFactor)
+        , RetryClassFunction(std::move(retryClassFunction))
+    {
+        Y_ASSERT(RetryClassFunction);
+        Y_ASSERT(MinDelay < MaxDelay);
+        Y_ASSERT(MinLongRetryDelay < MaxDelay);
+        Y_ASSERT(MinLongRetryDelay >= MinDelay);
+        Y_ASSERT(ScaleFactor > 1.0);
+        Y_ASSERT(MaxRetries > 0);
+        Y_ASSERT(MaxTime > MinDelay);
+    }
+
+    typename IRetryState::TPtr CreateRetryState() const override {
+        return std::make_unique<TExponentialBackoffState>(RetryClassFunction, MinDelay, MinLongRetryDelay, MaxDelay, MaxRetries, MaxTime, ScaleFactor);
+    }
+
+    const TDuration MinDelay;
+    const TDuration MinLongRetryDelay;
+    const TDuration MaxDelay;
+    const size_t MaxRetries;
+    const TDuration MaxTime;
+    const double ScaleFactor;
+    typename IRetryPolicy::TRetryClassFunction RetryClassFunction;
+};
+
+template <class... TArgs>
+struct TFixedIntervalPolicy : IRetryPolicy<TArgs...> {
+    using IRetryPolicy = IRetryPolicy<TArgs...>;
+    using IRetryState = typename IRetryPolicy::IRetryState;
+
+    struct TFixedIntervalState : IRetryState {
+        TFixedIntervalState(typename IRetryPolicy::TRetryClassFunction retryClassFunction,
+                            TDuration delay,
+                            TDuration longRetryDelay,
+                            size_t maxRetries,
+                            TDuration maxTime)
+            : Delay(delay)
+            , LongRetryDelay(longRetryDelay)
+            , MaxRetries(maxRetries)
+            , MaxTime(maxTime)
+            , StartTime(maxTime != TDuration::Max() ? TInstant::Now() : TInstant::Zero())
+            , AttemptsDone(0)
+            , RetryClassFunction(std::move(retryClassFunction))
+        {
+        }
+
+        TMaybe<TDuration> GetNextRetryDelay(typename TTypeTraits<TArgs>::TFuncParam... args) override {
+            const ERetryErrorClass errorClass = RetryClassFunction(args...);
+            if (errorClass == ERetryErrorClass::NoRetry || AttemptsDone >= MaxRetries || StartTime && TInstant::Now() - StartTime >= MaxTime) {
+                return Nothing();
+            }
+
+            const TDuration delay = NRetryDetails::RandomizeDelay(errorClass == ERetryErrorClass::LongRetry ? LongRetryDelay : Delay);
+
+            ++AttemptsDone;
+            return delay;
+        }
+
+        const TDuration Delay;
+        const TDuration LongRetryDelay;
+        const size_t MaxRetries;
+        const TDuration MaxTime;
+        const TInstant StartTime;
+        size_t AttemptsDone;
+        typename IRetryPolicy::TRetryClassFunction RetryClassFunction;
+    };
+
+    TFixedIntervalPolicy(typename IRetryPolicy::TRetryClassFunction retryClassFunction,
+                         TDuration delay,
+                         TDuration longRetryDelay,
+                         size_t maxRetries,
+                         TDuration maxTime)
+        : Delay(delay)
+        , LongRetryDelay(longRetryDelay)
+        , MaxRetries(maxRetries)
+        , MaxTime(maxTime)
+        , RetryClassFunction(std::move(retryClassFunction))
+    {
+        Y_ASSERT(RetryClassFunction);
+        Y_ASSERT(MaxRetries > 0);
+        Y_ASSERT(MaxTime > Delay);
+        Y_ASSERT(MaxTime > LongRetryDelay);
+        Y_ASSERT(LongRetryDelay >= Delay);
+    }
+
+    typename IRetryState::TPtr CreateRetryState() const override {
+        return std::make_unique<TFixedIntervalState>(RetryClassFunction, Delay, LongRetryDelay, MaxRetries, MaxTime);
+    }
+
+    const TDuration Delay;
+    const TDuration LongRetryDelay;
+    const size_t MaxRetries;
+    const TDuration MaxTime;
+    typename IRetryPolicy::TRetryClassFunction RetryClassFunction;
+};
+
+template <class... TArgs>
+typename IRetryPolicy<TArgs...>::TPtr IRetryPolicy<TArgs...>::GetNoRetryPolicy() {
+    return std::make_shared<TNoRetryPolicy<TArgs...>>();
+}
+
+template <class... TArgs>
+typename IRetryPolicy<TArgs...>::TPtr IRetryPolicy<TArgs...>::GetExponentialBackoffPolicy(TRetryClassFunction retryClassFunction,
+                                                                                          TDuration minDelay,
+                                                                                          TDuration minLongRetryDelay,
+                                                                                          TDuration maxDelay,
+                                                                                          size_t maxRetries,
+                                                                                          TDuration maxTime,
+                                                                                          double scaleFactor)
+{
+    return std::make_shared<TExponentialBackoffPolicy<TArgs...>>(std::move(retryClassFunction), minDelay, minLongRetryDelay, maxDelay, maxRetries, maxTime, scaleFactor);
+}
+
+template <class... TArgs>
+typename IRetryPolicy<TArgs...>::TPtr IRetryPolicy<TArgs...>::GetFixedIntervalPolicy(TRetryClassFunction retryClassFunction,
+                                                                                     TDuration delay,
+                                                                                     TDuration longRetryDelay,
+                                                                                     size_t maxRetries,
+                                                                                     TDuration maxTime)
+{
+    return std::make_shared<TFixedIntervalPolicy<TArgs...>>(std::move(retryClassFunction), delay, longRetryDelay, maxRetries, maxTime);
+}

+ 108 - 0
library/cpp/retry/retry_policy_ut.cpp

@@ -0,0 +1,108 @@
+#include "retry_policy.h"
+
+#include <library/cpp/testing/unittest/registar.h>
+
+Y_UNIT_TEST_SUITE(RetryPolicy) {
+    Y_UNIT_TEST(NoRetryPolicy) {
+        auto policy = IRetryPolicy<int>::GetNoRetryPolicy();
+        UNIT_ASSERT(!policy->CreateRetryState()->GetNextRetryDelay(42));
+    }
+
+    using ITestPolicy = IRetryPolicy<ERetryErrorClass>;
+
+    ERetryErrorClass ErrorClassFunction(ERetryErrorClass err) {
+        return err;
+    }
+
+#define ASSERT_INTERVAL(from, to, val) {        \
+        auto v = val;                           \
+        UNIT_ASSERT(v);                         \
+        UNIT_ASSERT_GE_C(*v, from, *v);         \
+        UNIT_ASSERT_LE_C(*v, to, *v);           \
+    }
+
+    Y_UNIT_TEST(FixedIntervalPolicy) {
+        auto policy = ITestPolicy::GetFixedIntervalPolicy(ErrorClassFunction, TDuration::MilliSeconds(100), TDuration::Seconds(100));
+        auto state = policy->CreateRetryState();
+        for (int i = 0; i < 5; ++i) {
+            ASSERT_INTERVAL(TDuration::MilliSeconds(50), TDuration::MilliSeconds(100), state->GetNextRetryDelay(ERetryErrorClass::ShortRetry));
+            ASSERT_INTERVAL(TDuration::Seconds(50), TDuration::Seconds(100), state->GetNextRetryDelay(ERetryErrorClass::LongRetry));
+            UNIT_ASSERT(!state->GetNextRetryDelay(ERetryErrorClass::NoRetry));
+        }
+    }
+
+    Y_UNIT_TEST(ExponentialBackoffPolicy) {
+        auto policy = ITestPolicy::GetExponentialBackoffPolicy(ErrorClassFunction, TDuration::MilliSeconds(100), TDuration::Seconds(100), TDuration::Seconds(500));
+        auto state = policy->CreateRetryState();
+
+        // Step 1
+        ASSERT_INTERVAL(TDuration::MilliSeconds(50), TDuration::MilliSeconds(100), state->GetNextRetryDelay(ERetryErrorClass::ShortRetry));
+
+        // Step 2
+        ASSERT_INTERVAL(TDuration::Seconds(50), TDuration::Seconds(100), state->GetNextRetryDelay(ERetryErrorClass::LongRetry));
+
+        // Step 3
+        ASSERT_INTERVAL(TDuration::Seconds(100), TDuration::Seconds(200), state->GetNextRetryDelay(ERetryErrorClass::ShortRetry));
+
+        // Step 4
+        ASSERT_INTERVAL(TDuration::Seconds(200), TDuration::Seconds(400), state->GetNextRetryDelay(ERetryErrorClass::LongRetry));
+
+        // Step 5. Max delay
+        ASSERT_INTERVAL(TDuration::Seconds(250), TDuration::Seconds(500), state->GetNextRetryDelay(ERetryErrorClass::LongRetry));
+        ASSERT_INTERVAL(TDuration::Seconds(250), TDuration::Seconds(500), state->GetNextRetryDelay(ERetryErrorClass::ShortRetry));
+
+        // No retry
+        UNIT_ASSERT(!state->GetNextRetryDelay(ERetryErrorClass::NoRetry));
+    }
+
+    void TestMaxRetries(bool exponentialBackoff) {
+        ITestPolicy::TPtr policy;
+        if (exponentialBackoff) {
+            policy = ITestPolicy::GetExponentialBackoffPolicy(ErrorClassFunction, TDuration::MilliSeconds(10), TDuration::MilliSeconds(200), TDuration::Seconds(30), 3);
+        } else {
+            policy = ITestPolicy::GetFixedIntervalPolicy(ErrorClassFunction, TDuration::MilliSeconds(100), TDuration::MilliSeconds(300), 3);
+        }
+        auto state = policy->CreateRetryState();
+        UNIT_ASSERT(state->GetNextRetryDelay(ERetryErrorClass::ShortRetry));
+        UNIT_ASSERT(state->GetNextRetryDelay(ERetryErrorClass::ShortRetry));
+        UNIT_ASSERT(state->GetNextRetryDelay(ERetryErrorClass::ShortRetry));
+        UNIT_ASSERT(!state->GetNextRetryDelay(ERetryErrorClass::ShortRetry));
+        UNIT_ASSERT(!state->GetNextRetryDelay(ERetryErrorClass::ShortRetry));
+    }
+
+    void TestMaxTime(bool exponentialBackoff) {
+        ITestPolicy::TPtr policy;
+        const TDuration maxDelay = TDuration::Seconds(2);
+        if (exponentialBackoff) {
+            policy = ITestPolicy::GetExponentialBackoffPolicy(ErrorClassFunction, TDuration::MilliSeconds(10), TDuration::MilliSeconds(200), TDuration::Seconds(30), 100500, maxDelay);
+        } else {
+            policy = ITestPolicy::GetFixedIntervalPolicy(ErrorClassFunction, TDuration::MilliSeconds(100), TDuration::MilliSeconds(300), 100500, maxDelay);
+        }
+        const TInstant start = TInstant::Now();
+        auto state = policy->CreateRetryState();
+        for (int i = 0; i < 3; ++i) {
+            auto delay = state->GetNextRetryDelay(ERetryErrorClass::ShortRetry);
+            const TInstant now = TInstant::Now();
+            UNIT_ASSERT(delay || now - start >= maxDelay);
+        }
+        Sleep(maxDelay);
+        UNIT_ASSERT(!state->GetNextRetryDelay(ERetryErrorClass::ShortRetry));
+        UNIT_ASSERT(!state->GetNextRetryDelay(ERetryErrorClass::ShortRetry));
+    }
+
+    Y_UNIT_TEST(MaxRetriesExponentialBackoff) {
+        TestMaxRetries(true);
+    }
+
+    Y_UNIT_TEST(MaxRetriesFixedInterval) {
+        TestMaxRetries(false);
+    }
+
+    Y_UNIT_TEST(MaxTimeExponentialBackoff) {
+        TestMaxTime(true);
+    }
+
+    Y_UNIT_TEST(MaxTimeFixedInterval) {
+        TestMaxTime(false);
+    }
+}

+ 1 - 0
library/cpp/retry/ut/ya.make

@@ -7,6 +7,7 @@ OWNER(
 )
 
 SRCS(
+    retry_policy_ut.cpp
     retry_ut.cpp
 )
 

+ 2 - 2
ydb/core/yq/libs/read_rule/read_rule_creator.cpp

@@ -136,7 +136,7 @@ public:
             if (!RetryState) {
                 RetryState = NYdb::NPersQueue::IRetryPolicy::GetExponentialBackoffPolicy()->CreateRetryState();
             }
-            TMaybe<TDuration> nextRetryDelay = RetryState->GetNextRetryDelay(status);
+            TMaybe<TDuration> nextRetryDelay = RetryState->GetNextRetryDelay(status.GetStatus());
             if (status.GetStatus() == NYdb::EStatus::SCHEME_ERROR) {
                 nextRetryDelay = Nothing(); // Not retryable
             }
@@ -198,7 +198,7 @@ private:
     NYdb::TDriver YdbDriver;
     NYdb::NPersQueue::TPersQueueClient PqClient;
     ui64 Index = 0;
-    NYdb::NPersQueue::IRetryState::TPtr RetryState;
+    NYdb::NPersQueue::IRetryPolicy::IRetryState::TPtr RetryState;
     bool RequestInFlight = false;
     bool Finishing = false;
 };

+ 2 - 2
ydb/core/yq/libs/read_rule/read_rule_deleter.cpp

@@ -129,7 +129,7 @@ public:
                         2.0 // scaleFactor
                     )->CreateRetryState();
             }
-            TMaybe<TDuration> nextRetryDelay = RetryState->GetNextRetryDelay(status);
+            TMaybe<TDuration> nextRetryDelay = RetryState->GetNextRetryDelay(status.GetStatus());
             if (status.GetStatus() == NYdb::EStatus::SCHEME_ERROR) {
                 nextRetryDelay = Nothing(); // No topic => OK. Leave just transient issues.
             }
@@ -173,7 +173,7 @@ private:
     NYdb::NPersQueue::TPersQueueClient PqClient;
     ui64 Index = 0;
     const size_t MaxRetries;
-    NYdb::NPersQueue::IRetryState::TPtr RetryState;
+    NYdb::NPersQueue::IRetryPolicy::IRetryState::TPtr RetryState;
 };
 
 // Actor for deletion of read rules for all topics in the query.

+ 6 - 6
ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/common.cpp

@@ -4,7 +4,7 @@
 
 namespace NYdb::NPersQueue {
 
-IRetryPolicy::ERetryErrorClass GetRetryErrorClass(EStatus status) {
+ERetryErrorClass GetRetryErrorClass(EStatus status) {
     switch (status) {
     case EStatus::SUCCESS:
     case EStatus::INTERNAL_ERROR:
@@ -19,7 +19,7 @@ IRetryPolicy::ERetryErrorClass GetRetryErrorClass(EStatus status) {
     case EStatus::CLIENT_INTERNAL_ERROR:
     case EStatus::CLIENT_CANCELLED:
     case EStatus::CLIENT_OUT_OF_RANGE:
-        return IRetryPolicy::ERetryErrorClass::ShortRetry;
+        return ERetryErrorClass::ShortRetry;
 
     case EStatus::OVERLOADED:
     case EStatus::TIMEOUT:
@@ -28,7 +28,7 @@ IRetryPolicy::ERetryErrorClass GetRetryErrorClass(EStatus status) {
     case EStatus::CLIENT_DEADLINE_EXCEEDED:
     case EStatus::CLIENT_LIMITS_REACHED:
     case EStatus::CLIENT_DISCOVERY_FAILED:
-        return IRetryPolicy::ERetryErrorClass::LongRetry;
+        return ERetryErrorClass::LongRetry;
 
     case EStatus::SCHEME_ERROR:
     case EStatus::STATUS_UNDEFINED:
@@ -40,14 +40,14 @@ IRetryPolicy::ERetryErrorClass GetRetryErrorClass(EStatus status) {
     case EStatus::NOT_FOUND:
     case EStatus::CLIENT_UNAUTHENTICATED:
     case EStatus::CLIENT_CALL_UNIMPLEMENTED:
-        return IRetryPolicy::ERetryErrorClass::NoRetry;
+        return ERetryErrorClass::NoRetry;
     }
 }
 
-IRetryPolicy::ERetryErrorClass GetRetryErrorClassV2(EStatus status) {
+ERetryErrorClass GetRetryErrorClassV2(EStatus status) {
     switch (status) {
         case EStatus::SCHEME_ERROR:
-            return IRetryPolicy::ERetryErrorClass::NoRetry;
+            return ERetryErrorClass::NoRetry;
         default:
             return GetRetryErrorClass(status);
 

+ 2 - 2
ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/common.h

@@ -11,8 +11,8 @@
 
 namespace NYdb::NPersQueue {
 
-IRetryPolicy::ERetryErrorClass GetRetryErrorClass(EStatus status);
-IRetryPolicy::ERetryErrorClass GetRetryErrorClassV2(EStatus status);
+ERetryErrorClass GetRetryErrorClass(EStatus status);
+ERetryErrorClass GetRetryErrorClassV2(EStatus status);
 
 void Cancel(NGrpc::IQueueClientContextPtr& context);
 

+ 5 - 181
ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/persqueue.cpp

@@ -170,189 +170,13 @@ TAsyncDescribeTopicResult TPersQueueClient::DescribeTopic(const TString& path, c
     return Impl_->DescribeTopic(path, settings);
 }
 
-namespace {
-
-struct TNoRetryState : IRetryState {
-    TMaybe<TDuration> GetNextRetryDelay(const TStatus&) override {
-        return Nothing();
-    }
-};
-
-struct TNoRetryPolicy : IRetryPolicy {
-    IRetryState::TPtr CreateRetryState() const override {
-        return std::make_unique<TNoRetryState>();
-    }
-};
-
-TDuration RandomizeDelay(TDuration baseDelay) {
-    const TDuration::TValue half = baseDelay.GetValue() / 2;
-    return TDuration::FromValue(half + RandomNumber<TDuration::TValue>(half));
-}
-
-struct TExponentialBackoffState : IRetryState {
-    TExponentialBackoffState(TDuration minDelay,
-                             TDuration minLongRetryDelay,
-                             TDuration maxDelay,
-                             size_t maxRetries,
-                             TDuration maxTime,
-                             double scaleFactor,
-                             std::function<IRetryPolicy::ERetryErrorClass(EStatus)> retryErrorClassFunction)
-        : MinLongRetryDelay(minLongRetryDelay)
-        , MaxDelay(maxDelay)
-        , MaxRetries(maxRetries)
-        , MaxTime(maxTime)
-        , ScaleFactor(scaleFactor)
-        , StartTime(maxTime != TDuration::Max() ? TInstant::Now() : TInstant::Zero())
-        , CurrentDelay(minDelay)
-        , AttemptsDone(0)
-        , RetryErrorClassFunction(retryErrorClassFunction)
-    {
-    }
-
-    TMaybe<TDuration> GetNextRetryDelay(const TStatus& status) override {
-        const IRetryPolicy::ERetryErrorClass errorClass = RetryErrorClassFunction(status.GetStatus());
-        if (AttemptsDone >= MaxRetries || StartTime && TInstant::Now() - StartTime >= MaxTime || errorClass == IRetryPolicy::ERetryErrorClass::NoRetry) {
-            return Nothing();
-        }
-
-        if (errorClass == IRetryPolicy::ERetryErrorClass::LongRetry) {
-            CurrentDelay = Max(CurrentDelay, MinLongRetryDelay);
-        }
-
-        const TDuration delay = RandomizeDelay(CurrentDelay);
-
-        if (CurrentDelay < MaxDelay) {
-            CurrentDelay = Min(CurrentDelay * ScaleFactor, MaxDelay);
-        }
-
-        ++AttemptsDone;
-        return delay;
-    }
-
-    const TDuration MinLongRetryDelay;
-    const TDuration MaxDelay;
-    const size_t MaxRetries;
-    const TDuration MaxTime;
-    const double ScaleFactor;
-    const TInstant StartTime;
-    TDuration CurrentDelay;
-    size_t AttemptsDone;
-    std::function<IRetryPolicy::ERetryErrorClass(EStatus)> RetryErrorClassFunction;
-};
-
-struct TExponentialBackoffPolicy : IRetryPolicy {
-    TExponentialBackoffPolicy(TDuration minDelay,
-                              TDuration minLongRetryDelay,
-                              TDuration maxDelay,
-                              size_t maxRetries,
-                              TDuration maxTime,
-                              double scaleFactor,
-                              std::function<IRetryPolicy::ERetryErrorClass(EStatus)> customRetryClassFunction)
-        : MinDelay(minDelay)
-        , MinLongRetryDelay(minLongRetryDelay)
-        , MaxDelay(maxDelay)
-        , MaxRetries(maxRetries)
-        , MaxTime(maxTime)
-        , ScaleFactor(scaleFactor)
-        , RetryErrorClassFunction(customRetryClassFunction ? customRetryClassFunction : GetRetryErrorClass)
-    {
-        Y_ASSERT(MinDelay < MaxDelay);
-        Y_ASSERT(MinLongRetryDelay < MaxDelay);
-        Y_ASSERT(MinLongRetryDelay >= MinDelay);
-        Y_ASSERT(ScaleFactor > 1.0);
-        Y_ASSERT(MaxRetries > 0);
-        Y_ASSERT(MaxTime > MinDelay);
-    }
-
-    IRetryState::TPtr CreateRetryState() const override {
-        return std::make_unique<TExponentialBackoffState>(MinDelay, MinLongRetryDelay, MaxDelay, MaxRetries, MaxTime, ScaleFactor, RetryErrorClassFunction);
-    }
-
-    const TDuration MinDelay;
-    const TDuration MinLongRetryDelay;
-    const TDuration MaxDelay;
-    const size_t MaxRetries;
-    const TDuration MaxTime;
-    const double ScaleFactor;
-    std::function<IRetryPolicy::ERetryErrorClass(EStatus)> RetryErrorClassFunction;
-};
-
-struct TFixedIntervalState : IRetryState {
-    TFixedIntervalState(TDuration delay,
-                        TDuration longRetryDelay,
-                        size_t maxRetries,
-                        TDuration maxTime,
-                        std::function<IRetryPolicy::ERetryErrorClass(EStatus)> retryErrorClassFunction)
-        : Delay(delay)
-        , LongRetryDelay(longRetryDelay)
-        , MaxRetries(maxRetries)
-        , MaxTime(maxTime)
-        , StartTime(maxTime != TDuration::Max() ? TInstant::Now() : TInstant::Zero())
-        , AttemptsDone(0)
-        , RetryErrorClassFunction(retryErrorClassFunction)
-    {
-    }
-
-    TMaybe<TDuration> GetNextRetryDelay(const TStatus& status) override {
-        const IRetryPolicy::ERetryErrorClass errorClass = RetryErrorClassFunction(status.GetStatus());
-        if (AttemptsDone >= MaxRetries || StartTime && TInstant::Now() - StartTime >= MaxTime || errorClass == IRetryPolicy::ERetryErrorClass::NoRetry) {
-            return Nothing();
-        }
-
-        const TDuration delay = RandomizeDelay(errorClass == IRetryPolicy::ERetryErrorClass::LongRetry ? LongRetryDelay : Delay);
-
-        ++AttemptsDone;
-        return delay;
-    }
-
-    const TDuration Delay;
-    const TDuration LongRetryDelay;
-    const size_t MaxRetries;
-    const TDuration MaxTime;
-    const TInstant StartTime;
-    size_t AttemptsDone;
-    std::function<IRetryPolicy::ERetryErrorClass(EStatus)> RetryErrorClassFunction;
-};
-
-struct TFixedIntervalPolicy : IRetryPolicy {
-    TFixedIntervalPolicy(TDuration delay,
-                         TDuration longRetryDelay,
-                         size_t maxRetries,
-                         TDuration maxTime,
-                         std::function<IRetryPolicy::ERetryErrorClass(EStatus)> customRetryClassFunction)
-        : Delay(delay)
-        , LongRetryDelay(longRetryDelay)
-        , MaxRetries(maxRetries)
-        , MaxTime(maxTime)
-        , RetryErrorClassFunction(customRetryClassFunction ? customRetryClassFunction : GetRetryErrorClass)
-    {
-        Y_ASSERT(MaxRetries > 0);
-        Y_ASSERT(MaxTime > Delay);
-        Y_ASSERT(MaxTime > LongRetryDelay);
-        Y_ASSERT(LongRetryDelay >= Delay);
-    }
-
-    IRetryState::TPtr CreateRetryState() const override {
-        return std::make_unique<TFixedIntervalState>(Delay, LongRetryDelay, MaxRetries, MaxTime, RetryErrorClassFunction);
-    }
-
-    const TDuration Delay;
-    const TDuration LongRetryDelay;
-    const size_t MaxRetries;
-    const TDuration MaxTime;
-    std::function<IRetryPolicy::ERetryErrorClass(EStatus)> RetryErrorClassFunction;
-};
-
-} // namespace
-
 IRetryPolicy::TPtr IRetryPolicy::GetDefaultPolicy() {
     static IRetryPolicy::TPtr policy = GetExponentialBackoffPolicy();
     return policy;
 }
 
 IRetryPolicy::TPtr IRetryPolicy::GetNoRetryPolicy() {
-    static IRetryPolicy::TPtr policy = std::make_shared<TNoRetryPolicy>();
-    return policy;
+    return ::IRetryPolicy<EStatus>::GetNoRetryPolicy();
 }
 
 IRetryPolicy::TPtr IRetryPolicy::GetExponentialBackoffPolicy(TDuration minDelay,
@@ -361,18 +185,18 @@ IRetryPolicy::TPtr IRetryPolicy::GetExponentialBackoffPolicy(TDuration minDelay,
                                                              size_t maxRetries,
                                                              TDuration maxTime,
                                                              double scaleFactor,
-                                                             std::function<IRetryPolicy::ERetryErrorClass(EStatus)> customRetryClassFunction)
+                                                             std::function<ERetryErrorClass(EStatus)> customRetryClassFunction)
 {
-    return std::make_shared<TExponentialBackoffPolicy>(minDelay, minLongRetryDelay, maxDelay, maxRetries, maxTime, scaleFactor, customRetryClassFunction);
+    return ::IRetryPolicy<EStatus>::GetExponentialBackoffPolicy(customRetryClassFunction ? customRetryClassFunction : GetRetryErrorClass, minDelay, minLongRetryDelay, maxDelay, maxRetries, maxTime, scaleFactor);
 }
 
 IRetryPolicy::TPtr IRetryPolicy::GetFixedIntervalPolicy(TDuration delay,
                                                         TDuration longRetryDelay,
                                                         size_t maxRetries,
                                                         TDuration maxTime,
-                                                        std::function<IRetryPolicy::ERetryErrorClass(EStatus)> customRetryClassFunction)
+                                                        std::function<ERetryErrorClass(EStatus)> customRetryClassFunction)
 {
-    return std::make_shared<TFixedIntervalPolicy>(delay, longRetryDelay, maxRetries, maxTime, customRetryClassFunction);
+    return ::IRetryPolicy<EStatus>::GetFixedIntervalPolicy(customRetryClassFunction ? customRetryClassFunction : GetRetryErrorClass, delay, longRetryDelay, maxRetries, maxTime);
 }
 
 std::shared_ptr<IReadSession> TPersQueueClient::CreateReadSession(const TReadSessionSettings& settings) {

+ 2 - 2
ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.cpp

@@ -233,7 +233,7 @@ void TReadSession::OnClusterDiscovery(const TStatus& status, const Ydb::PersQueu
             if (!ClusterDiscoveryRetryState) {
                 ClusterDiscoveryRetryState = Settings.RetryPolicy_->CreateRetryState();
             }
-            TMaybe<TDuration> retryDelay = ClusterDiscoveryRetryState->GetNextRetryDelay(status);
+            TMaybe<TDuration> retryDelay = ClusterDiscoveryRetryState->GetNextRetryDelay(status.GetStatus());
             if (retryDelay) {
                 Log << TLOG_INFO << "Cluster discovery request failed. Status: " << status.GetStatus()
                                    << ". Issues: \"" << IssuesSingleLineString(status.GetIssues()) << "\"";
@@ -724,7 +724,7 @@ bool TSingleClusterReadSessionImpl::Reconnect(const TPlainStatus& status) {
         ServerMessage = std::make_shared<Ydb::PersQueue::V1::MigrationStreamingReadServerMessage>();
         ++ConnectionGeneration;
         if (RetryState) {
-            TMaybe<TDuration> nextDelay = RetryState->GetNextRetryDelay(TPlainStatus(status));
+            TMaybe<TDuration> nextDelay = RetryState->GetNextRetryDelay(status.Status);
             if (nextDelay) {
                 delay = *nextDelay;
                 delayContext = ClientContext->CreateContext();

+ 2 - 2
ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.h

@@ -970,7 +970,7 @@ private:
     size_t ConnectionGeneration = 0;
     TAdaptiveLock Lock;
     IProcessor::TPtr Processor;
-    IRetryState::TPtr RetryState; // Current retry state (if now we are (re)connecting).
+    IRetryPolicy::IRetryState::TPtr RetryState; // Current retry state (if now we are (re)connecting).
     size_t ConnectionAttemptsDone = 0;
 
     // Memory usage.
@@ -1103,7 +1103,7 @@ private:
     std::shared_ptr<TReadSessionEventsQueue> EventsQueue;
     THashMap<TString, TClusterSessionInfo> ClusterSessions; // Cluster name (in lower case) -> TClusterSessionInfo
     NGrpc::IQueueClientContextPtr ClusterDiscoveryDelayContext;
-    IRetryState::TPtr ClusterDiscoveryRetryState;
+    IRetryPolicy::IRetryState::TPtr ClusterDiscoveryRetryState;
     bool DataReadingSuspended = false;
 
     NGrpc::IQueueClientContextPtr DumpCountersContext;

Some files were not shown because too many files changed in this diff