Browse Source

Restoring authorship annotation for <lexeyo@yandex-team.ru>. Commit 2 of 2.

lexeyo 3 years ago
parent
commit
c0a1bd5a47

+ 35 - 35
library/cpp/threading/future/core/future-inl.h

@@ -506,32 +506,32 @@ namespace NThreading {
 
     ////////////////////////////////////////////////////////////////////////////////
 
-    class TFutureStateId { 
-    private: 
-        const void* Id; 
- 
-    public: 
-        template <typename T> 
-        explicit TFutureStateId(const NImpl::TFutureState<T>& state) 
-            : Id(&state) 
-        { 
-        } 
- 
-        const void* Value() const noexcept { 
-            return Id; 
-        } 
-    }; 
- 
-    inline bool operator==(const TFutureStateId& l, const TFutureStateId& r) { 
-        return l.Value() == r.Value(); 
-    } 
- 
-    inline bool operator!=(const TFutureStateId& l, const TFutureStateId& r) { 
-        return !(l == r); 
-    } 
- 
-    //////////////////////////////////////////////////////////////////////////////// 
- 
+    class TFutureStateId {
+    private:
+        const void* Id;
+
+    public:
+        template <typename T>
+        explicit TFutureStateId(const NImpl::TFutureState<T>& state)
+            : Id(&state)
+        {
+        }
+
+        const void* Value() const noexcept {
+            return Id;
+        }
+    };
+
+    inline bool operator==(const TFutureStateId& l, const TFutureStateId& r) {
+        return l.Value() == r.Value();
+    }
+
+    inline bool operator!=(const TFutureStateId& l, const TFutureStateId& r) {
+        return !(l == r);
+    }
+
+    ////////////////////////////////////////////////////////////////////////////////
+
     template <typename T>
     inline TFuture<T>::TFuture(const TIntrusivePtr<TFutureState>& state) noexcept
         : State(state)
@@ -642,11 +642,11 @@ namespace NThreading {
     }
 
     template <typename T>
-    inline TMaybe<TFutureStateId> TFuture<T>::StateId() const noexcept { 
-        return State != nullptr ? MakeMaybe<TFutureStateId>(*State) : Nothing(); 
-    } 
- 
-    template <typename T> 
+    inline TMaybe<TFutureStateId> TFuture<T>::StateId() const noexcept {
+        return State != nullptr ? MakeMaybe<TFutureStateId>(*State) : Nothing();
+    }
+
+    template <typename T>
     inline void TFuture<T>::EnsureInitialized() const {
         if (!State) {
             ythrow TFutureException() << "state not initialized";
@@ -745,10 +745,10 @@ namespace NThreading {
         return bool(State);
     }
 
-    inline TMaybe<TFutureStateId> TFuture<void>::StateId() const noexcept { 
-        return State != nullptr ? MakeMaybe<TFutureStateId>(*State) : Nothing(); 
-    } 
- 
+    inline TMaybe<TFutureStateId> TFuture<void>::StateId() const noexcept {
+        return State != nullptr ? MakeMaybe<TFutureStateId>(*State) : Nothing();
+    }
+
     inline void TFuture<void>::EnsureInitialized() const {
         if (!State) {
             ythrow TFutureException() << "state not initialized";

+ 15 - 15
library/cpp/threading/future/core/future.h

@@ -4,7 +4,7 @@
 
 #include <util/datetime/base.h>
 #include <util/generic/function.h>
-#include <util/generic/maybe.h> 
+#include <util/generic/maybe.h>
 #include <util/generic/ptr.h>
 #include <util/generic/vector.h>
 #include <util/generic/yexception.h>
@@ -61,9 +61,9 @@ namespace NThreading {
     template <typename F, typename T>
     using TFutureCallResult = typename NImpl::TFutureCallResult<F, T>::TType;
 
-    //! Type of the future/promise state identifier 
-    class TFutureStateId; 
- 
+    //! Type of the future/promise state identifier
+    class TFutureStateId;
+
     ////////////////////////////////////////////////////////////////////////////////
 
     template <typename T>
@@ -113,11 +113,11 @@ namespace NThreading {
 
         TFuture<void> IgnoreResult() const;
 
-        //! If the future is initialized returns the future state identifier. Otherwise returns an empty optional 
-        /** The state identifier is guaranteed to be unique during the future state lifetime and could be reused after its death 
-        **/ 
-        TMaybe<TFutureStateId> StateId() const noexcept; 
- 
+        //! If the future is initialized returns the future state identifier. Otherwise returns an empty optional
+        /** The state identifier is guaranteed to be unique during the future state lifetime and could be reused after its death
+        **/
+        TMaybe<TFutureStateId> StateId() const noexcept;
+
         void EnsureInitialized() const;
     };
 
@@ -172,12 +172,12 @@ namespace NThreading {
         TFuture<void> IgnoreResult() const {
             return *this;
         }
- 
-        //! If the future is initialized returns the future state identifier. Otherwise returns an empty optional 
-        /** The state identifier is guaranteed to be unique during the future state lifetime and could be reused after its death 
-        **/ 
-        TMaybe<TFutureStateId> StateId() const noexcept; 
- 
+
+        //! If the future is initialized returns the future state identifier. Otherwise returns an empty optional
+        /** The state identifier is guaranteed to be unique during the future state lifetime and could be reused after its death
+        **/
+        TMaybe<TFutureStateId> StateId() const noexcept;
+
         void EnsureInitialized() const;
     };
 

+ 36 - 36
library/cpp/threading/future/future_ut.cpp

@@ -3,12 +3,12 @@
 #include <library/cpp/testing/unittest/registar.h>
 
 #include <list>
-#include <type_traits> 
+#include <type_traits>
 
 namespace NThreading {
- 
-namespace { 
- 
+
+namespace {
+
     class TCopyCounter {
     public:
         TCopyCounter(size_t* numCopies)
@@ -35,33 +35,33 @@ namespace {
         size_t* NumCopies = nullptr;
     };
 
-    template <typename T> 
-    auto MakePromise() { 
-        if constexpr (std::is_same_v<T, void>) { 
-            return NewPromise(); 
-        } 
-        return NewPromise<T>(); 
-    } 
- 
- 
-    template <typename T> 
-    void TestFutureStateId() { 
-        TFuture<T> empty; 
-        UNIT_ASSERT(!empty.StateId().Defined()); 
-        auto promise1 = MakePromise<T>(); 
-        auto future11 = promise1.GetFuture(); 
-        UNIT_ASSERT(future11.StateId().Defined()); 
-        auto future12 = promise1.GetFuture(); 
-        UNIT_ASSERT_EQUAL(future11.StateId(), future11.StateId()); // same result for subsequent invocations 
-        UNIT_ASSERT_EQUAL(future11.StateId(), future12.StateId()); // same result for different futures with the same state 
-        auto promise2 = MakePromise<T>(); 
-        auto future2 = promise2.GetFuture(); 
-        UNIT_ASSERT(future2.StateId().Defined()); 
-        UNIT_ASSERT_UNEQUAL(future11.StateId(), future2.StateId()); // different results for futures with different states 
-    } 
- 
-} 
- 
+    template <typename T>
+    auto MakePromise() {
+        if constexpr (std::is_same_v<T, void>) {
+            return NewPromise();
+        }
+        return NewPromise<T>();
+    }
+
+
+    template <typename T>
+    void TestFutureStateId() {
+        TFuture<T> empty;
+        UNIT_ASSERT(!empty.StateId().Defined());
+        auto promise1 = MakePromise<T>();
+        auto future11 = promise1.GetFuture();
+        UNIT_ASSERT(future11.StateId().Defined());
+        auto future12 = promise1.GetFuture();
+        UNIT_ASSERT_EQUAL(future11.StateId(), future11.StateId()); // same result for subsequent invocations
+        UNIT_ASSERT_EQUAL(future11.StateId(), future12.StateId()); // same result for different futures with the same state
+        auto promise2 = MakePromise<T>();
+        auto future2 = promise2.GetFuture();
+        UNIT_ASSERT(future2.StateId().Defined());
+        UNIT_ASSERT_UNEQUAL(future11.StateId(), future2.StateId()); // different results for futures with different states
+    }
+
+}
+
     ////////////////////////////////////////////////////////////////////////////////
 
     Y_UNIT_TEST_SUITE(TFutureTest) {
@@ -577,11 +577,11 @@ namespace {
             promise1.SetValue();
             UNIT_ASSERT_EXCEPTION_CONTAINS(wait.GetValueSync(), yexception, "foo-exception");
         }
- 
-        Y_UNIT_TEST(FutureStateId) { 
-            TestFutureStateId<void>(); 
-            TestFutureStateId<int>(); 
-        } 
+
+        Y_UNIT_TEST(FutureStateId) {
+            TestFutureStateId<void>();
+            TestFutureStateId<int>();
+        }
 
         template <typename T>
         void TestApplyNoRvalueCopyImpl() {

+ 104 - 104
library/cpp/threading/future/subscription/README.md

@@ -1,104 +1,104 @@
-Subscriptions manager and wait primitives library 
-================================================= 
- 
-Wait primitives 
---------------- 
- 
-All wait primitives are futures those being signaled when some or all of theirs dependencies are signaled. 
-Wait privimitives could be constructed either from an initializer_list or from a standard container of futures. 
- 
-1. WaitAll is signaled when all its dependencies are signaled: 
- 
-    ```C++ 
-    #include <library/cpp/threading/subscriptions/wait_all.h> 
- 
-    auto w = NWait::WaitAll({ future1, future2, ..., futureN }); 
-    ... 
-    w.Wait(); // wait for all futures 
-    ``` 
- 
-2. WaitAny is signaled when any of its dependencies is signaled: 
- 
-    ```C++ 
-    #include <library/cpp/threading/subscriptions/wait_any.h> 
- 
-    auto w = NWait::WaitAny(TVector<TFuture<T>>{ future1, future2, ..., futureN }); 
-    ... 
-    w.Wait(); // wait for any future 
-    ``` 
- 
-3. WaitAllOrException is signaled when all its dependencies are signaled with values or any dependency is signaled with an exception: 
- 
-    ```C++ 
-    #include <library/cpp/threading/subscriptions/wait_all_or_exception.h> 
- 
-    auto w = NWait::WaitAllOrException(TVector<TFuture<T>>{ future1, future2, ..., futureN }); 
-    ... 
-    w.Wait(); // wait for all values or for an exception 
-    ``` 
- 
-Subscriptions manager 
---------------------- 
- 
-The subscription manager can manage multiple links beetween futures and callbacks. Multiple managed subscriptions to a single future shares just a single underlying subscription to the future. That allows dynamic creation and deletion of subscriptions and efficient implementation of different wait primitives. 
-The subscription manager could be used in the following way: 
- 
-1. Subscribe to a single future: 
- 
-    ```C++ 
-    #include <library/cpp/threading/subscriptions/subscription.h> 
- 
-    TFuture<int> LongOperation(); 
- 
-    ... 
-    auto future = LongRunnigOperation(); 
-    auto m = MakeSubsriptionManager<int>(); 
-    auto id = m->Subscribe(future, [](TFuture<int> const& f) { 
-        try { 
-            auto value = f.GetValue(); 
-            ... 
-        } catch (...) { 
-            ... // handle exception 
-        } 
-    }); 
-    if (id.has_value()) { 
-        ... // Callback will run asynchronously 
-    } else { 
-        ... // Future has been signaled already. The callback has been invoked synchronously 
-    } 
-    ``` 
- 
-    Note that a callback could be invoked synchronously during a Subscribe call. In this case the returned optional will have no value. 
- 
-2. Unsubscribe from a single future: 
- 
-    ```C++ 
-    // id holds the subscription id from a previous Subscribe call 
-    m->Unsubscribe(id.value()); 
-    ``` 
- 
-    There is no need to call Unsubscribe if the callback has been called. In this case Unsubscribe will do nothing. And it is safe to call Unsubscribe with the same id multiple times. 
- 
-3. Subscribe a single callback to multiple futures: 
- 
-    ```C++ 
-    auto ids = m->Subscribe({ future1, future2, ..., futureN }, [](auto&& f) { ... }); 
-    ... 
-    ``` 
- 
-    Futures could be passed to Subscribe method either via an initializer_list or via a standard container like vector or list. Subscribe method accept an optional boolean parameter revertOnSignaled. If the parameter is false (default) then all suscriptions will be performed regardless of the futures states and the returned vector will have a subscription id for each future (even if callback has been executed synchronously for some futures). Otherwise the method will stop on the first signaled future (the callback will be synchronously called for it), no suscriptions will be created and an empty vector will be returned. 
- 
-4. Unsubscribe multiple subscriptions: 
- 
-    ```C++ 
-    // ids is the vector or subscription ids 
-    m->Unsubscribe(ids); 
-    ``` 
- 
-    The vector of IDs could be a result of a previous Subscribe call or an arbitrary set of IDs of previously created subscriptions. 
- 
-5. If you do not want to instantiate a new instance of the subscription manager it is possible to use the default instance: 
- 
-    ```C++ 
-    auto m = TSubscriptionManager<T>::Default(); 
-    ``` 
+Subscriptions manager and wait primitives library
+=================================================
+
+Wait primitives
+---------------
+
+All wait primitives are futures those being signaled when some or all of theirs dependencies are signaled.
+Wait privimitives could be constructed either from an initializer_list or from a standard container of futures.
+
+1. WaitAll is signaled when all its dependencies are signaled:
+
+    ```C++
+    #include <library/cpp/threading/subscriptions/wait_all.h>
+
+    auto w = NWait::WaitAll({ future1, future2, ..., futureN });
+    ...
+    w.Wait(); // wait for all futures
+    ```
+
+2. WaitAny is signaled when any of its dependencies is signaled:
+
+    ```C++
+    #include <library/cpp/threading/subscriptions/wait_any.h>
+
+    auto w = NWait::WaitAny(TVector<TFuture<T>>{ future1, future2, ..., futureN });
+    ...
+    w.Wait(); // wait for any future
+    ```
+
+3. WaitAllOrException is signaled when all its dependencies are signaled with values or any dependency is signaled with an exception:
+
+    ```C++
+    #include <library/cpp/threading/subscriptions/wait_all_or_exception.h>
+
+    auto w = NWait::WaitAllOrException(TVector<TFuture<T>>{ future1, future2, ..., futureN });
+    ...
+    w.Wait(); // wait for all values or for an exception
+    ```
+
+Subscriptions manager
+---------------------
+
+The subscription manager can manage multiple links beetween futures and callbacks. Multiple managed subscriptions to a single future shares just a single underlying subscription to the future. That allows dynamic creation and deletion of subscriptions and efficient implementation of different wait primitives.
+The subscription manager could be used in the following way:
+
+1. Subscribe to a single future:
+
+    ```C++
+    #include <library/cpp/threading/subscriptions/subscription.h>
+
+    TFuture<int> LongOperation();
+
+    ...
+    auto future = LongRunnigOperation();
+    auto m = MakeSubsriptionManager<int>();
+    auto id = m->Subscribe(future, [](TFuture<int> const& f) {
+        try {
+            auto value = f.GetValue();
+            ...
+        } catch (...) {
+            ... // handle exception
+        }
+    });
+    if (id.has_value()) {
+        ... // Callback will run asynchronously
+    } else {
+        ... // Future has been signaled already. The callback has been invoked synchronously
+    }
+    ```
+
+    Note that a callback could be invoked synchronously during a Subscribe call. In this case the returned optional will have no value.
+
+2. Unsubscribe from a single future:
+
+    ```C++
+    // id holds the subscription id from a previous Subscribe call
+    m->Unsubscribe(id.value());
+    ```
+
+    There is no need to call Unsubscribe if the callback has been called. In this case Unsubscribe will do nothing. And it is safe to call Unsubscribe with the same id multiple times.
+
+3. Subscribe a single callback to multiple futures:
+
+    ```C++
+    auto ids = m->Subscribe({ future1, future2, ..., futureN }, [](auto&& f) { ... });
+    ...
+    ```
+
+    Futures could be passed to Subscribe method either via an initializer_list or via a standard container like vector or list. Subscribe method accept an optional boolean parameter revertOnSignaled. If the parameter is false (default) then all suscriptions will be performed regardless of the futures states and the returned vector will have a subscription id for each future (even if callback has been executed synchronously for some futures). Otherwise the method will stop on the first signaled future (the callback will be synchronously called for it), no suscriptions will be created and an empty vector will be returned.
+
+4. Unsubscribe multiple subscriptions:
+
+    ```C++
+    // ids is the vector or subscription ids
+    m->Unsubscribe(ids);
+    ```
+
+    The vector of IDs could be a result of a previous Subscribe call or an arbitrary set of IDs of previously created subscriptions.
+
+5. If you do not want to instantiate a new instance of the subscription manager it is possible to use the default instance:
+
+    ```C++
+    auto m = TSubscriptionManager<T>::Default();
+    ```

+ 118 - 118
library/cpp/threading/future/subscription/subscription-inl.h

@@ -1,118 +1,118 @@
-#pragma once 
- 
-#if !defined(INCLUDE_LIBRARY_THREADING_FUTURE_SUBSCRIPTION_INL_H) 
-#error "you should never include subscription-inl.h directly" 
-#endif 
- 
-namespace NThreading { 
- 
-namespace NPrivate { 
- 
-template <typename T> 
-TFutureStateId CheckedStateId(TFuture<T> const& future) { 
-    auto const id = future.StateId(); 
-    if (id.Defined()) { 
-        return *id; 
-    } 
-    ythrow TFutureException() << "Future state should be initialized"; 
-} 
- 
-} 
- 
-template <typename T, typename F, typename TCallbackExecutor> 
-inline TSubscriptionManager::TSubscription::TSubscription(TFuture<T> future, F&& callback, TCallbackExecutor&& executor) 
-    : Callback( 
-            [future = std::move(future), callback = std::forward<F>(callback), executor = std::forward<TCallbackExecutor>(executor)]() mutable { 
-                executor(std::as_const(future), callback); 
-            }) 
-{ 
-} 
- 
-template <typename T, typename F, typename TCallbackExecutor> 
-inline std::optional<TSubscriptionId> TSubscriptionManager::Subscribe(TFuture<T> const& future, F&& callback, TCallbackExecutor&& executor) { 
-    auto stateId = NPrivate::CheckedStateId(future); 
-    with_lock(Lock) { 
-        auto const status = TrySubscribe(future, std::forward<F>(callback), stateId, std::forward<TCallbackExecutor>(executor)); 
-        switch (status) { 
-            case ECallbackStatus::Subscribed: 
-                return TSubscriptionId(stateId, Revision); 
-            case ECallbackStatus::ExecutedSynchronously: 
-                return {}; 
-            default: 
-                Y_FAIL("Unexpected callback status"); 
-        } 
-    } 
-} 
- 
-template <typename TFutures, typename F, typename TCallbackExecutor> 
-inline TVector<TSubscriptionId> TSubscriptionManager::Subscribe(TFutures const& futures, F&& callback, bool revertOnSignaled 
-                                                                , TCallbackExecutor&& executor) 
-{ 
-    return SubscribeImpl(futures, std::forward<F>(callback), revertOnSignaled, std::forward<TCallbackExecutor>(executor)); 
-} 
- 
-template <typename T, typename F, typename TCallbackExecutor> 
-inline TVector<TSubscriptionId> TSubscriptionManager::Subscribe(std::initializer_list<TFuture<T> const> futures, F&& callback 
-                                                                , bool revertOnSignaled, TCallbackExecutor&& executor) 
-{ 
-    return SubscribeImpl(futures, std::forward<F>(callback), revertOnSignaled, std::forward<TCallbackExecutor>(executor)); 
-} 
- 
-template <typename T, typename F, typename TCallbackExecutor> 
-inline TSubscriptionManager::ECallbackStatus TSubscriptionManager::TrySubscribe(TFuture<T> const& future, F&& callback, TFutureStateId stateId 
-                                                                                , TCallbackExecutor&& executor) 
-{ 
-    TSubscription subscription(future, std::forward<F>(callback), std::forward<TCallbackExecutor>(executor)); 
-    auto const it = Subscriptions.find(stateId); 
-    auto const revision = ++Revision; 
-    if (it == std::end(Subscriptions)) { 
-        auto const success = Subscriptions.emplace(stateId, THashMap<ui64, TSubscription>{ { revision, std::move(subscription) } }).second; 
-        Y_VERIFY(success); 
-        auto self = TSubscriptionManagerPtr(this); 
-        future.Subscribe([self, stateId](TFuture<T> const&) { self->OnCallback(stateId); }); 
-        if (Subscriptions.find(stateId) == std::end(Subscriptions)) { 
-            return ECallbackStatus::ExecutedSynchronously; 
-        } 
-    } else { 
-        Y_VERIFY(it->second.emplace(revision, std::move(subscription)).second); 
-    } 
-    return ECallbackStatus::Subscribed; 
-} 
- 
-template <typename TFutures, typename F, typename TCallbackExecutor> 
-inline TVector<TSubscriptionId> TSubscriptionManager::SubscribeImpl(TFutures const& futures, F const& callback, bool revertOnSignaled 
-                                                                    , TCallbackExecutor const& executor) 
-{ 
-    TVector<TSubscriptionId> results; 
-    results.reserve(std::size(futures)); 
-    // resolve all state ids to minimize processing under the lock 
-    for (auto const& f : futures) { 
-        results.push_back(TSubscriptionId(NPrivate::CheckedStateId(f), 0)); 
-    } 
-    with_lock(Lock) { 
-        size_t i = 0; 
-        for (auto const& f : futures) { 
-            auto& r = results[i]; 
-            auto const status = TrySubscribe(f, callback, r.StateId(), executor); 
-            switch (status) { 
-                case ECallbackStatus::Subscribed: 
-                    break; 
-                case ECallbackStatus::ExecutedSynchronously: 
-                    if (revertOnSignaled) { 
-                        // revert 
-                        results.crop(i); 
-                        UnsubscribeImpl(results); 
-                        return {}; 
-                    } 
-                    break; 
-                default: 
-                    Y_FAIL("Unexpected callback status"); 
-            } 
-            r.SetSubId(Revision); 
-            ++i; 
-        } 
-    } 
-    return results; 
-} 
- 
-} 
+#pragma once
+
+#if !defined(INCLUDE_LIBRARY_THREADING_FUTURE_SUBSCRIPTION_INL_H)
+#error "you should never include subscription-inl.h directly"
+#endif
+
+namespace NThreading {
+
+namespace NPrivate {
+
+template <typename T>
+TFutureStateId CheckedStateId(TFuture<T> const& future) {
+    auto const id = future.StateId();
+    if (id.Defined()) {
+        return *id;
+    }
+    ythrow TFutureException() << "Future state should be initialized";
+}
+
+}
+
+template <typename T, typename F, typename TCallbackExecutor>
+inline TSubscriptionManager::TSubscription::TSubscription(TFuture<T> future, F&& callback, TCallbackExecutor&& executor)
+    : Callback(
+            [future = std::move(future), callback = std::forward<F>(callback), executor = std::forward<TCallbackExecutor>(executor)]() mutable {
+                executor(std::as_const(future), callback);
+            })
+{
+}
+
+template <typename T, typename F, typename TCallbackExecutor>
+inline std::optional<TSubscriptionId> TSubscriptionManager::Subscribe(TFuture<T> const& future, F&& callback, TCallbackExecutor&& executor) {
+    auto stateId = NPrivate::CheckedStateId(future);
+    with_lock(Lock) {
+        auto const status = TrySubscribe(future, std::forward<F>(callback), stateId, std::forward<TCallbackExecutor>(executor));
+        switch (status) {
+            case ECallbackStatus::Subscribed:
+                return TSubscriptionId(stateId, Revision);
+            case ECallbackStatus::ExecutedSynchronously:
+                return {};
+            default:
+                Y_FAIL("Unexpected callback status");
+        }
+    }
+}
+
+template <typename TFutures, typename F, typename TCallbackExecutor>
+inline TVector<TSubscriptionId> TSubscriptionManager::Subscribe(TFutures const& futures, F&& callback, bool revertOnSignaled
+                                                                , TCallbackExecutor&& executor)
+{
+    return SubscribeImpl(futures, std::forward<F>(callback), revertOnSignaled, std::forward<TCallbackExecutor>(executor));
+}
+
+template <typename T, typename F, typename TCallbackExecutor>
+inline TVector<TSubscriptionId> TSubscriptionManager::Subscribe(std::initializer_list<TFuture<T> const> futures, F&& callback
+                                                                , bool revertOnSignaled, TCallbackExecutor&& executor)
+{
+    return SubscribeImpl(futures, std::forward<F>(callback), revertOnSignaled, std::forward<TCallbackExecutor>(executor));
+}
+
+template <typename T, typename F, typename TCallbackExecutor>
+inline TSubscriptionManager::ECallbackStatus TSubscriptionManager::TrySubscribe(TFuture<T> const& future, F&& callback, TFutureStateId stateId
+                                                                                , TCallbackExecutor&& executor)
+{
+    TSubscription subscription(future, std::forward<F>(callback), std::forward<TCallbackExecutor>(executor));
+    auto const it = Subscriptions.find(stateId);
+    auto const revision = ++Revision;
+    if (it == std::end(Subscriptions)) {
+        auto const success = Subscriptions.emplace(stateId, THashMap<ui64, TSubscription>{ { revision, std::move(subscription) } }).second;
+        Y_VERIFY(success);
+        auto self = TSubscriptionManagerPtr(this);
+        future.Subscribe([self, stateId](TFuture<T> const&) { self->OnCallback(stateId); });
+        if (Subscriptions.find(stateId) == std::end(Subscriptions)) {
+            return ECallbackStatus::ExecutedSynchronously;
+        }
+    } else {
+        Y_VERIFY(it->second.emplace(revision, std::move(subscription)).second);
+    }
+    return ECallbackStatus::Subscribed;
+}
+
+template <typename TFutures, typename F, typename TCallbackExecutor>
+inline TVector<TSubscriptionId> TSubscriptionManager::SubscribeImpl(TFutures const& futures, F const& callback, bool revertOnSignaled
+                                                                    , TCallbackExecutor const& executor)
+{
+    TVector<TSubscriptionId> results;
+    results.reserve(std::size(futures));
+    // resolve all state ids to minimize processing under the lock
+    for (auto const& f : futures) {
+        results.push_back(TSubscriptionId(NPrivate::CheckedStateId(f), 0));
+    }
+    with_lock(Lock) {
+        size_t i = 0;
+        for (auto const& f : futures) {
+            auto& r = results[i];
+            auto const status = TrySubscribe(f, callback, r.StateId(), executor);
+            switch (status) {
+                case ECallbackStatus::Subscribed:
+                    break;
+                case ECallbackStatus::ExecutedSynchronously:
+                    if (revertOnSignaled) {
+                        // revert
+                        results.crop(i);
+                        UnsubscribeImpl(results);
+                        return {};
+                    }
+                    break;
+                default:
+                    Y_FAIL("Unexpected callback status");
+            }
+            r.SetSubId(Revision);
+            ++i;
+        }
+    }
+    return results;
+}
+
+}

+ 65 - 65
library/cpp/threading/future/subscription/subscription.cpp

@@ -1,65 +1,65 @@
-#include "subscription.h" 
- 
-namespace NThreading { 
- 
-bool operator==(TSubscriptionId const& l, TSubscriptionId const& r) noexcept { 
-    return l.StateId() == r.StateId() && l.SubId() == r.SubId(); 
-} 
- 
-bool operator!=(TSubscriptionId const& l, TSubscriptionId const& r) noexcept { 
-    return !(l == r); 
-} 
- 
-void TSubscriptionManager::TSubscription::operator()() { 
-    Callback(); 
-} 
- 
-TSubscriptionManagerPtr TSubscriptionManager::NewInstance() { 
-    return new TSubscriptionManager(); 
-} 
- 
-TSubscriptionManagerPtr TSubscriptionManager::Default() { 
-    static auto instance = NewInstance(); 
-    return instance; 
-} 
- 
-void TSubscriptionManager::Unsubscribe(TSubscriptionId id) { 
-    with_lock(Lock) { 
-        UnsubscribeImpl(id); 
-    } 
-} 
- 
-void TSubscriptionManager::Unsubscribe(TVector<TSubscriptionId> const& ids) { 
-    with_lock(Lock) { 
-        UnsubscribeImpl(ids); 
-    } 
-} 
- 
-void TSubscriptionManager::OnCallback(TFutureStateId stateId) noexcept { 
-    THashMap<ui64, TSubscription> subscriptions; 
-    with_lock(Lock) { 
-        auto const it = Subscriptions.find(stateId); 
-        Y_VERIFY(it != Subscriptions.end(), "The callback has been triggered more than once"); 
-        subscriptions.swap(it->second); 
-        Subscriptions.erase(it); 
-    } 
-    for (auto& [_, subscription] : subscriptions) { 
-        subscription(); 
-    } 
-} 
- 
-void TSubscriptionManager::UnsubscribeImpl(TSubscriptionId id) { 
-    auto const it = Subscriptions.find(id.StateId()); 
-    if (it == std::end(Subscriptions)) { 
-        return; 
-    } 
-    it->second.erase(id.SubId()); 
-} 
- 
-void TSubscriptionManager::UnsubscribeImpl(TVector<TSubscriptionId> const& ids) { 
-    for (auto const& id : ids) { 
-        UnsubscribeImpl(id); 
-    } 
-} 
- 
-} 
+#include "subscription.h"
+
+namespace NThreading {
+
+bool operator==(TSubscriptionId const& l, TSubscriptionId const& r) noexcept {
+    return l.StateId() == r.StateId() && l.SubId() == r.SubId();
+}
+
+bool operator!=(TSubscriptionId const& l, TSubscriptionId const& r) noexcept {
+    return !(l == r);
+}
+
+void TSubscriptionManager::TSubscription::operator()() {
+    Callback();
+}
+
+TSubscriptionManagerPtr TSubscriptionManager::NewInstance() {
+    return new TSubscriptionManager();
+}
+
+TSubscriptionManagerPtr TSubscriptionManager::Default() {
+    static auto instance = NewInstance();
+    return instance;
+}
+
+void TSubscriptionManager::Unsubscribe(TSubscriptionId id) {
+    with_lock(Lock) {
+        UnsubscribeImpl(id);
+    }
+}
+
+void TSubscriptionManager::Unsubscribe(TVector<TSubscriptionId> const& ids) {
+    with_lock(Lock) {
+        UnsubscribeImpl(ids);
+    }
+}
+
+void TSubscriptionManager::OnCallback(TFutureStateId stateId) noexcept {
+    THashMap<ui64, TSubscription> subscriptions;
+    with_lock(Lock) {
+        auto const it = Subscriptions.find(stateId);
+        Y_VERIFY(it != Subscriptions.end(), "The callback has been triggered more than once");
+        subscriptions.swap(it->second);
+        Subscriptions.erase(it);
+    }
+    for (auto& [_, subscription] : subscriptions) {
+        subscription();
+    }
+}
+
+void TSubscriptionManager::UnsubscribeImpl(TSubscriptionId id) {
+    auto const it = Subscriptions.find(id.StateId());
+    if (it == std::end(Subscriptions)) {
+        return;
+    }
+    it->second.erase(id.SubId());
+}
+
+void TSubscriptionManager::UnsubscribeImpl(TVector<TSubscriptionId> const& ids) {
+    for (auto const& id : ids) {
+        UnsubscribeImpl(id);
+    }
+}
+
+}

+ 186 - 186
library/cpp/threading/future/subscription/subscription.h

@@ -1,186 +1,186 @@
-#pragma once 
- 
-#include <library/cpp/threading/future/future.h> 
- 
-#include <util/generic/hash.h> 
-#include <util/generic/ptr.h> 
-#include <util/generic/vector.h> 
-#include <util/system/mutex.h> 
- 
-#include <functional> 
-#include <optional> 
-#include <utility> 
- 
-namespace NThreading { 
- 
-namespace NPrivate { 
- 
-struct TNoexceptExecutor { 
-    template <typename T, typename F> 
-    void operator()(TFuture<T> const& future, F&& callee) const noexcept { 
-        return callee(future); 
-    } 
-}; 
- 
-} 
- 
-class TSubscriptionManager; 
- 
-using TSubscriptionManagerPtr = TIntrusivePtr<TSubscriptionManager>; 
- 
-//! A subscription id 
-class TSubscriptionId { 
-private: 
-    TFutureStateId StateId_; 
-    ui64 SubId_; // Secondary id to make the whole subscription id unique 
- 
-    friend class TSubscriptionManager; 
- 
-public: 
-    TFutureStateId StateId() const noexcept { 
-        return StateId_; 
-    } 
- 
-    ui64 SubId() const noexcept { 
-        return SubId_; 
-    } 
- 
-private: 
-    TSubscriptionId(TFutureStateId stateId, ui64 subId) 
-        : StateId_(stateId) 
-        , SubId_(subId) 
-    { 
-    } 
- 
-    void SetSubId(ui64 subId) noexcept { 
-        SubId_ = subId; 
-    } 
-}; 
- 
-bool operator==(TSubscriptionId const& l, TSubscriptionId const& r) noexcept; 
-bool operator!=(TSubscriptionId const& l, TSubscriptionId const& r) noexcept; 
- 
-//! The subscription manager manages subscriptions to futures 
-/** It provides an ability to create (and drop) multiple subscriptions to any future 
-    with just a single underlying subscription per future. 
- 
-    When a future is signaled all its subscriptions are removed. 
-    So, there no need to call Unsubscribe for subscriptions to already signaled futures. 
- 
-    Warning!!! For correct operation this class imposes the following requirement to futures/promises: 
-    Any used future must be signaled (value or exception set) before the future state destruction. 
-    Otherwise subscriptions and futures may happen. 
-    Current future design does not provide the required guarantee. But that should be fixed soon. 
-**/ 
-class TSubscriptionManager final : public TAtomicRefCount<TSubscriptionManager> { 
-private: 
-    //! A single subscription 
-    class TSubscription { 
-    private: 
-        std::function<void()> Callback; 
- 
-    public: 
-        template <typename T, typename F, typename TCallbackExecutor> 
-        TSubscription(TFuture<T> future, F&& callback, TCallbackExecutor&& executor); 
- 
-        void operator()(); 
-    }; 
- 
-    struct TFutureStateIdHash { 
-        size_t operator()(TFutureStateId const id) const noexcept { 
-            auto const value = id.Value(); 
-            return ::hash<decltype(value)>()(value); 
-        } 
-    }; 
- 
-private: 
-    THashMap<TFutureStateId, THashMap<ui64, TSubscription>, TFutureStateIdHash> Subscriptions; 
-    ui64 Revision = 0; 
-    TMutex Lock; 
- 
-public: 
-    //! Creates a new subscription manager instance 
-    static TSubscriptionManagerPtr NewInstance(); 
- 
-    //! The default subscription manager instance 
-    static TSubscriptionManagerPtr Default(); 
- 
-    //! Attempts to subscribe the callback to the future 
-    /** Subscription should succeed if the future is not signaled yet. 
-        Otherwise the callback will be called synchronously and nullopt will be returned 
- 
-        @param future - The future to subscribe to 
-        @param callback - The callback to attach 
-        @return The subscription id on success, nullopt if the future has been signaled already 
-    **/ 
-    template <typename T, typename F, typename TCallbackExecutor = NPrivate::TNoexceptExecutor> 
-    std::optional<TSubscriptionId> Subscribe(TFuture<T> const& future, F&& callback 
-                                                , TCallbackExecutor&& executor = NPrivate::TNoexceptExecutor()); 
- 
-    //! Drops the subscription with the given id 
-    /** @param id - The subscription id 
-    **/ 
-    void Unsubscribe(TSubscriptionId id); 
- 
-    //! Attempts to subscribe the callback to the set of futures 
-    /** @param futures - The futures to subscribe to 
-        @param callback - The callback to attach 
-        @param revertOnSignaled - Shows whether to stop and revert the subscription process if one of the futures is in signaled state 
-        @return The vector of subscription ids if no revert happened or an empty vector otherwise 
-                A subscription id will be valid even if a corresponding future has been signaled 
-    **/ 
-    template <typename TFutures, typename F, typename TCallbackExecutor = NPrivate::TNoexceptExecutor> 
-    TVector<TSubscriptionId> Subscribe(TFutures const& futures, F&& callback, bool revertOnSignaled = false 
-                                                    , TCallbackExecutor&& executor = NPrivate::TNoexceptExecutor()); 
- 
-    //! Attempts to subscribe the callback to the set of futures 
-    /** @param futures - The futures to subscribe to 
-        @param callback - The callback to attach 
-        @param revertOnSignaled - Shows whether to stop and revert the subscription process if one of the futures is in signaled state 
-        @return The vector of subscription ids if no revert happened or an empty vector otherwise 
-                A subscription id will be valid even if a corresponding future has been signaled 
-    **/ 
-    template <typename T, typename F, typename TCallbackExecutor = NPrivate::TNoexceptExecutor> 
-    TVector<TSubscriptionId> Subscribe(std::initializer_list<TFuture<T> const> futures, F&& callback, bool revertOnSignaled = false 
-                                                    , TCallbackExecutor&& executor = NPrivate::TNoexceptExecutor()); 
- 
-    //! Drops the subscriptions with the given ids 
-    /** @param ids - The subscription ids 
-    **/ 
-    void Unsubscribe(TVector<TSubscriptionId> const& ids); 
- 
-private: 
-    enum class ECallbackStatus { 
-        Subscribed, //! A subscription has been created. The callback will be called asynchronously. 
-        ExecutedSynchronously //! A callback has been called synchronously. No subscription has been created 
-    }; 
- 
-private: 
-    //! .ctor 
-    TSubscriptionManager() = default; 
-    //! Processes a callback from a future 
-    void OnCallback(TFutureStateId stateId) noexcept; 
-    //! Attempts to create a subscription 
-    /** This method should be called under the lock 
-    **/ 
-    template <typename T, typename F, typename TCallbackExecutor> 
-    ECallbackStatus TrySubscribe(TFuture<T> const& future, F&& callback, TFutureStateId stateId, TCallbackExecutor&& executor); 
-    //! Batch subscribe implementation 
-    template <typename TFutures, typename F, typename TCallbackExecutor> 
-    TVector<TSubscriptionId> SubscribeImpl(TFutures const& futures, F const& callback, bool revertOnSignaled 
-                                                        , TCallbackExecutor const& executor); 
-    //! Unsubscribe implementation 
-    /** This method should be called under the lock 
-    **/ 
-    void UnsubscribeImpl(TSubscriptionId id); 
-    //! Batch unsubscribe implementation 
-    /** This method should be called under the lock 
-    **/ 
-    void UnsubscribeImpl(TVector<TSubscriptionId> const& ids); 
-}; 
- 
-} 
- 
-#define INCLUDE_LIBRARY_THREADING_FUTURE_SUBSCRIPTION_INL_H 
-#include "subscription-inl.h" 
-#undef INCLUDE_LIBRARY_THREADING_FUTURE_SUBSCRIPTION_INL_H 
+#pragma once
+
+#include <library/cpp/threading/future/future.h>
+
+#include <util/generic/hash.h>
+#include <util/generic/ptr.h>
+#include <util/generic/vector.h>
+#include <util/system/mutex.h>
+
+#include <functional>
+#include <optional>
+#include <utility>
+
+namespace NThreading {
+
+namespace NPrivate {
+
+struct TNoexceptExecutor {
+    template <typename T, typename F>
+    void operator()(TFuture<T> const& future, F&& callee) const noexcept {
+        return callee(future);
+    }
+};
+
+}
+
+class TSubscriptionManager;
+
+using TSubscriptionManagerPtr = TIntrusivePtr<TSubscriptionManager>;
+
+//! A subscription id
+class TSubscriptionId {
+private:
+    TFutureStateId StateId_;
+    ui64 SubId_; // Secondary id to make the whole subscription id unique
+
+    friend class TSubscriptionManager;
+
+public:
+    TFutureStateId StateId() const noexcept {
+        return StateId_;
+    }
+
+    ui64 SubId() const noexcept {
+        return SubId_;
+    }
+
+private:
+    TSubscriptionId(TFutureStateId stateId, ui64 subId)
+        : StateId_(stateId)
+        , SubId_(subId)
+    {
+    }
+
+    void SetSubId(ui64 subId) noexcept {
+        SubId_ = subId;
+    }
+};
+
+bool operator==(TSubscriptionId const& l, TSubscriptionId const& r) noexcept;
+bool operator!=(TSubscriptionId const& l, TSubscriptionId const& r) noexcept;
+
+//! The subscription manager manages subscriptions to futures
+/** It provides an ability to create (and drop) multiple subscriptions to any future
+    with just a single underlying subscription per future.
+
+    When a future is signaled all its subscriptions are removed.
+    So, there no need to call Unsubscribe for subscriptions to already signaled futures.
+
+    Warning!!! For correct operation this class imposes the following requirement to futures/promises:
+    Any used future must be signaled (value or exception set) before the future state destruction.
+    Otherwise subscriptions and futures may happen.
+    Current future design does not provide the required guarantee. But that should be fixed soon.
+**/
+class TSubscriptionManager final : public TAtomicRefCount<TSubscriptionManager> {
+private:
+    //! A single subscription
+    class TSubscription {
+    private:
+        std::function<void()> Callback;
+
+    public:
+        template <typename T, typename F, typename TCallbackExecutor>
+        TSubscription(TFuture<T> future, F&& callback, TCallbackExecutor&& executor);
+
+        void operator()();
+    };
+
+    struct TFutureStateIdHash {
+        size_t operator()(TFutureStateId const id) const noexcept {
+            auto const value = id.Value();
+            return ::hash<decltype(value)>()(value);
+        }
+    };
+
+private:
+    THashMap<TFutureStateId, THashMap<ui64, TSubscription>, TFutureStateIdHash> Subscriptions;
+    ui64 Revision = 0;
+    TMutex Lock;
+
+public:
+    //! Creates a new subscription manager instance
+    static TSubscriptionManagerPtr NewInstance();
+
+    //! The default subscription manager instance
+    static TSubscriptionManagerPtr Default();
+
+    //! Attempts to subscribe the callback to the future
+    /** Subscription should succeed if the future is not signaled yet.
+        Otherwise the callback will be called synchronously and nullopt will be returned
+
+        @param future - The future to subscribe to
+        @param callback - The callback to attach
+        @return The subscription id on success, nullopt if the future has been signaled already
+    **/
+    template <typename T, typename F, typename TCallbackExecutor = NPrivate::TNoexceptExecutor>
+    std::optional<TSubscriptionId> Subscribe(TFuture<T> const& future, F&& callback
+                                                , TCallbackExecutor&& executor = NPrivate::TNoexceptExecutor());
+
+    //! Drops the subscription with the given id
+    /** @param id - The subscription id
+    **/
+    void Unsubscribe(TSubscriptionId id);
+
+    //! Attempts to subscribe the callback to the set of futures
+    /** @param futures - The futures to subscribe to
+        @param callback - The callback to attach
+        @param revertOnSignaled - Shows whether to stop and revert the subscription process if one of the futures is in signaled state
+        @return The vector of subscription ids if no revert happened or an empty vector otherwise
+                A subscription id will be valid even if a corresponding future has been signaled
+    **/
+    template <typename TFutures, typename F, typename TCallbackExecutor = NPrivate::TNoexceptExecutor>
+    TVector<TSubscriptionId> Subscribe(TFutures const& futures, F&& callback, bool revertOnSignaled = false
+                                                    , TCallbackExecutor&& executor = NPrivate::TNoexceptExecutor());
+
+    //! Attempts to subscribe the callback to the set of futures
+    /** @param futures - The futures to subscribe to
+        @param callback - The callback to attach
+        @param revertOnSignaled - Shows whether to stop and revert the subscription process if one of the futures is in signaled state
+        @return The vector of subscription ids if no revert happened or an empty vector otherwise
+                A subscription id will be valid even if a corresponding future has been signaled
+    **/
+    template <typename T, typename F, typename TCallbackExecutor = NPrivate::TNoexceptExecutor>
+    TVector<TSubscriptionId> Subscribe(std::initializer_list<TFuture<T> const> futures, F&& callback, bool revertOnSignaled = false
+                                                    , TCallbackExecutor&& executor = NPrivate::TNoexceptExecutor());
+
+    //! Drops the subscriptions with the given ids
+    /** @param ids - The subscription ids
+    **/
+    void Unsubscribe(TVector<TSubscriptionId> const& ids);
+
+private:
+    enum class ECallbackStatus {
+        Subscribed, //! A subscription has been created. The callback will be called asynchronously.
+        ExecutedSynchronously //! A callback has been called synchronously. No subscription has been created
+    };
+
+private:
+    //! .ctor
+    TSubscriptionManager() = default;
+    //! Processes a callback from a future
+    void OnCallback(TFutureStateId stateId) noexcept;
+    //! Attempts to create a subscription
+    /** This method should be called under the lock
+    **/
+    template <typename T, typename F, typename TCallbackExecutor>
+    ECallbackStatus TrySubscribe(TFuture<T> const& future, F&& callback, TFutureStateId stateId, TCallbackExecutor&& executor);
+    //! Batch subscribe implementation
+    template <typename TFutures, typename F, typename TCallbackExecutor>
+    TVector<TSubscriptionId> SubscribeImpl(TFutures const& futures, F const& callback, bool revertOnSignaled
+                                                        , TCallbackExecutor const& executor);
+    //! Unsubscribe implementation
+    /** This method should be called under the lock
+    **/
+    void UnsubscribeImpl(TSubscriptionId id);
+    //! Batch unsubscribe implementation
+    /** This method should be called under the lock
+    **/
+    void UnsubscribeImpl(TVector<TSubscriptionId> const& ids);
+};
+
+}
+
+#define INCLUDE_LIBRARY_THREADING_FUTURE_SUBSCRIPTION_INL_H
+#include "subscription-inl.h"
+#undef INCLUDE_LIBRARY_THREADING_FUTURE_SUBSCRIPTION_INL_H

+ 431 - 431
library/cpp/threading/future/subscription/subscription_ut.cpp

@@ -1,432 +1,432 @@
-#include "subscription.h" 
- 
+#include "subscription.h"
+
 #include <library/cpp/testing/unittest/registar.h>
- 
-using namespace NThreading; 
- 
-Y_UNIT_TEST_SUITE(TSubscriptionManagerTest) { 
- 
-    Y_UNIT_TEST(TestSubscribeUnsignaled) { 
-        auto m = TSubscriptionManager::NewInstance(); 
-        auto p = NewPromise(); 
- 
-        size_t callCount = 0; 
-        auto id = m->Subscribe(p.GetFuture(), [&callCount](auto&&) { ++callCount; } ); 
-        UNIT_ASSERT(id.has_value()); 
-        UNIT_ASSERT_EQUAL(callCount, 0); 
- 
-        p.SetValue(); 
-        UNIT_ASSERT_EQUAL(callCount, 1); 
-    } 
- 
-    Y_UNIT_TEST(TestSubscribeSignaled) { 
-        auto m = TSubscriptionManager::NewInstance(); 
-        auto f = MakeFuture(); 
- 
-        size_t callCount = 0; 
-        auto id = m->Subscribe(f, [&callCount](auto&&) { ++callCount; } ); 
-        UNIT_ASSERT(!id.has_value()); 
-        UNIT_ASSERT_EQUAL(callCount, 1); 
-    } 
- 
-    Y_UNIT_TEST(TestSubscribeUnsignaledAndSignaled) { 
-        auto m = TSubscriptionManager::NewInstance(); 
-        auto p = NewPromise(); 
- 
-        size_t callCount1 = 0; 
-        auto id1 = m->Subscribe(p.GetFuture(), [&callCount1](auto&&) { ++callCount1; } ); 
-        UNIT_ASSERT(id1.has_value()); 
-        UNIT_ASSERT_EQUAL(callCount1, 0); 
- 
-        p.SetValue(); 
-        UNIT_ASSERT_EQUAL(callCount1, 1); 
- 
-        size_t callCount2 = 0; 
-        auto id2 = m->Subscribe(p.GetFuture(), [&callCount2](auto&&) { ++callCount2; } ); 
-        UNIT_ASSERT(!id2.has_value()); 
-        UNIT_ASSERT_EQUAL(callCount2, 1); 
-        UNIT_ASSERT_EQUAL(callCount1, 1); 
-    } 
- 
-    Y_UNIT_TEST(TestSubscribeUnsubscribeUnsignaled) { 
-        auto m = TSubscriptionManager::NewInstance(); 
-        auto p = NewPromise(); 
- 
-        size_t callCount = 0; 
-        auto id = m->Subscribe(p.GetFuture(), [&callCount](auto&&) { ++callCount; } ); 
-        UNIT_ASSERT(id.has_value()); 
-        UNIT_ASSERT_EQUAL(callCount, 0); 
- 
-        m->Unsubscribe(id.value()); 
- 
-        p.SetValue(); 
-        UNIT_ASSERT_EQUAL(callCount, 0); 
-    } 
- 
-    Y_UNIT_TEST(TestSubscribeUnsignaledUnsubscribeSignaled) { 
-        auto m = TSubscriptionManager::NewInstance(); 
-        auto p = NewPromise(); 
- 
-        size_t callCount = 0; 
-        auto id = m->Subscribe(p.GetFuture(), [&callCount](auto&&) { ++callCount; } ); 
-        UNIT_ASSERT(id.has_value()); 
-        UNIT_ASSERT_EQUAL(callCount, 0); 
- 
-        p.SetValue(); 
-        UNIT_ASSERT_EQUAL(callCount, 1); 
- 
-        m->Unsubscribe(id.value()); 
-        UNIT_ASSERT_EQUAL(callCount, 1); 
-    } 
- 
-    Y_UNIT_TEST(TestUnsubscribeTwice) { 
-        auto m = TSubscriptionManager::NewInstance(); 
-        auto p = NewPromise(); 
- 
-        size_t callCount = 0; 
-        auto id = m->Subscribe(p.GetFuture(), [&callCount](auto&&) { ++callCount; } ); 
-        UNIT_ASSERT(id.has_value()); 
-        UNIT_ASSERT_EQUAL(callCount, 0); 
- 
-        m->Unsubscribe(id.value()); 
-        UNIT_ASSERT_EQUAL(callCount, 0); 
-        m->Unsubscribe(id.value()); 
-        UNIT_ASSERT_EQUAL(callCount, 0); 
- 
-        p.SetValue(); 
-        UNIT_ASSERT_EQUAL(callCount, 0); 
-    } 
- 
-    Y_UNIT_TEST(TestSubscribeOneUnsignaledManyTimes) { 
-        auto m = TSubscriptionManager::NewInstance(); 
-        auto p = NewPromise(); 
- 
-        size_t callCount1 = 0; 
-        auto id1 = m->Subscribe(p.GetFuture(), [&callCount1](auto&&) { ++callCount1; } ); 
-        size_t callCount2 = 0; 
-        auto id2 = m->Subscribe(p.GetFuture(), [&callCount2](auto&&) { ++callCount2; } ); 
-        size_t callCount3 = 0; 
-        auto id3 = m->Subscribe(p.GetFuture(), [&callCount3](auto&&) { ++callCount3; } ); 
- 
-        UNIT_ASSERT(id1.has_value()); 
-        UNIT_ASSERT(id2.has_value()); 
-        UNIT_ASSERT(id3.has_value()); 
-        UNIT_ASSERT_UNEQUAL(id1.value(), id2.value()); 
-        UNIT_ASSERT_UNEQUAL(id2.value(), id3.value()); 
-        UNIT_ASSERT_UNEQUAL(id3.value(), id1.value()); 
-        UNIT_ASSERT_EQUAL(callCount1, 0); 
-        UNIT_ASSERT_EQUAL(callCount2, 0); 
-        UNIT_ASSERT_EQUAL(callCount3, 0); 
- 
-        p.SetValue(); 
-        UNIT_ASSERT_EQUAL(callCount1, 1); 
-        UNIT_ASSERT_EQUAL(callCount2, 1); 
-        UNIT_ASSERT_EQUAL(callCount3, 1); 
-    } 
- 
-    Y_UNIT_TEST(TestSubscribeOneSignaledManyTimes) { 
-        auto m = TSubscriptionManager::NewInstance(); 
-        auto f = MakeFuture(); 
- 
-        size_t callCount1 = 0; 
-        auto id1 = m->Subscribe(f, [&callCount1](auto&&) { ++callCount1; } ); 
-        size_t callCount2 = 0; 
-        auto id2 = m->Subscribe(f, [&callCount2](auto&&) { ++callCount2; } ); 
-        size_t callCount3 = 0; 
-        auto id3 = m->Subscribe(f, [&callCount3](auto&&) { ++callCount3; } ); 
- 
-        UNIT_ASSERT(!id1.has_value()); 
-        UNIT_ASSERT(!id2.has_value()); 
-        UNIT_ASSERT(!id3.has_value()); 
-        UNIT_ASSERT_EQUAL(callCount1, 1); 
-        UNIT_ASSERT_EQUAL(callCount2, 1); 
-        UNIT_ASSERT_EQUAL(callCount3, 1); 
-    } 
- 
-    Y_UNIT_TEST(TestSubscribeUnsubscribeOneUnsignaledManyTimes) { 
-        auto m = TSubscriptionManager::NewInstance(); 
-        auto p = NewPromise(); 
- 
-        size_t callCount1 = 0; 
-        auto id1 = m->Subscribe(p.GetFuture(), [&callCount1](auto&&) { ++callCount1; } ); 
-        size_t callCount2 = 0; 
-        auto id2 = m->Subscribe(p.GetFuture(), [&callCount2](auto&&) { ++callCount2; } ); 
-        size_t callCount3 = 0; 
-        auto id3 = m->Subscribe(p.GetFuture(), [&callCount3](auto&&) { ++callCount3; } ); 
-        size_t callCount4 = 0; 
-        auto id4 = m->Subscribe(p.GetFuture(), [&callCount4](auto&&) { ++callCount4; } ); 
- 
-        UNIT_ASSERT(id1.has_value()); 
-        UNIT_ASSERT(id2.has_value()); 
-        UNIT_ASSERT(id3.has_value()); 
-        UNIT_ASSERT(id4.has_value()); 
-        UNIT_ASSERT_EQUAL(callCount1, 0); 
-        UNIT_ASSERT_EQUAL(callCount2, 0); 
-        UNIT_ASSERT_EQUAL(callCount3, 0); 
-        UNIT_ASSERT_EQUAL(callCount4, 0); 
- 
-        m->Unsubscribe(id3.value()); 
-        m->Unsubscribe(id1.value()); 
-        UNIT_ASSERT_EQUAL(callCount1, 0); 
-        UNIT_ASSERT_EQUAL(callCount2, 0); 
-        UNIT_ASSERT_EQUAL(callCount3, 0); 
-        UNIT_ASSERT_EQUAL(callCount4, 0); 
- 
-        p.SetValue(); 
-        UNIT_ASSERT_EQUAL(callCount1, 0); 
-        UNIT_ASSERT_EQUAL(callCount2, 1); 
-        UNIT_ASSERT_EQUAL(callCount3, 0); 
-        UNIT_ASSERT_EQUAL(callCount4, 1); 
-    } 
- 
-    Y_UNIT_TEST(TestSubscribeManyUnsignaled) { 
-        auto m = TSubscriptionManager::NewInstance(); 
-        auto p1 = NewPromise<int>(); 
-        auto p2 = NewPromise<int>(); 
- 
-        size_t callCount1 = 0; 
-        auto id1 = m->Subscribe(p1.GetFuture(), [&callCount1](auto&&) { ++callCount1; } ); 
-        size_t callCount2 = 0; 
-        auto id2 = m->Subscribe(p2.GetFuture(), [&callCount2](auto&&) { ++callCount2; } ); 
-        size_t callCount3 = 0; 
-        auto id3 = m->Subscribe(p1.GetFuture(), [&callCount3](auto&&) { ++callCount3; } ); 
- 
-        UNIT_ASSERT(id1.has_value()); 
-        UNIT_ASSERT(id2.has_value()); 
-        UNIT_ASSERT(id3.has_value()); 
-        UNIT_ASSERT_UNEQUAL(id1.value(), id2.value()); 
-        UNIT_ASSERT_UNEQUAL(id2.value(), id3.value()); 
-        UNIT_ASSERT_UNEQUAL(id3.value(), id1.value()); 
-        UNIT_ASSERT_EQUAL(callCount1, 0); 
-        UNIT_ASSERT_EQUAL(callCount2, 0); 
-        UNIT_ASSERT_EQUAL(callCount3, 0); 
- 
-        p1.SetValue(33); 
-        UNIT_ASSERT_EQUAL(callCount1, 1); 
-        UNIT_ASSERT_EQUAL(callCount2, 0); 
-        UNIT_ASSERT_EQUAL(callCount3, 1); 
- 
-        p2.SetValue(111); 
-        UNIT_ASSERT_EQUAL(callCount1, 1); 
-        UNIT_ASSERT_EQUAL(callCount2, 1); 
-        UNIT_ASSERT_EQUAL(callCount3, 1); 
-    } 
- 
-    Y_UNIT_TEST(TestSubscribeManySignaled) { 
-        auto m = TSubscriptionManager::NewInstance(); 
-        auto f1 = MakeFuture(0); 
-        auto f2 = MakeFuture(1); 
- 
-        size_t callCount1 = 0; 
-        auto id1 = m->Subscribe(f1, [&callCount1](auto&&) { ++callCount1; } ); 
-        size_t callCount2 = 0; 
-        auto id2 = m->Subscribe(f2, [&callCount2](auto&&) { ++callCount2; } ); 
-        size_t callCount3 = 0; 
-        auto id3 = m->Subscribe(f2, [&callCount3](auto&&) { ++callCount3; } ); 
- 
-        UNIT_ASSERT(!id1.has_value()); 
-        UNIT_ASSERT(!id2.has_value()); 
-        UNIT_ASSERT(!id3.has_value()); 
-        UNIT_ASSERT_EQUAL(callCount1, 1); 
-        UNIT_ASSERT_EQUAL(callCount2, 1); 
-        UNIT_ASSERT_EQUAL(callCount3, 1); 
-    } 
- 
-    Y_UNIT_TEST(TestSubscribeManyMixed) { 
-        auto m = TSubscriptionManager::NewInstance(); 
-        auto p1 = NewPromise<int>(); 
-        auto p2 = NewPromise<int>(); 
-        auto f = MakeFuture(42); 
- 
-        size_t callCount1 = 0; 
-        auto id1 = m->Subscribe(p1.GetFuture(), [&callCount1](auto&&) { ++callCount1; } ); 
-        size_t callCount2 = 0; 
-        auto id2 = m->Subscribe(p2.GetFuture(), [&callCount2](auto&&) { ++callCount2; } ); 
-        size_t callCount3 = 0; 
-        auto id3 = m->Subscribe(f, [&callCount3](auto&&) { ++callCount3; } ); 
- 
-        UNIT_ASSERT(id1.has_value()); 
-        UNIT_ASSERT(id2.has_value()); 
-        UNIT_ASSERT(!id3.has_value()); 
-        UNIT_ASSERT_UNEQUAL(id1.value(), id2.value()); 
-        UNIT_ASSERT_EQUAL(callCount1, 0); 
-        UNIT_ASSERT_EQUAL(callCount2, 0); 
-        UNIT_ASSERT_EQUAL(callCount3, 1); 
- 
-        p1.SetValue(45); 
-        UNIT_ASSERT_EQUAL(callCount1, 1); 
-        UNIT_ASSERT_EQUAL(callCount2, 0); 
-        UNIT_ASSERT_EQUAL(callCount3, 1); 
- 
-        p2.SetValue(-7); 
-        UNIT_ASSERT_EQUAL(callCount1, 1); 
-        UNIT_ASSERT_EQUAL(callCount2, 1); 
-        UNIT_ASSERT_EQUAL(callCount3, 1); 
-    } 
- 
-    Y_UNIT_TEST(TestSubscribeUnsubscribeMany) { 
-        auto m = TSubscriptionManager::NewInstance(); 
-        auto p1 = NewPromise<int>(); 
-        auto p2 = NewPromise<int>(); 
-        auto p3 = NewPromise<int>(); 
- 
-        size_t callCount1 = 0; 
-        auto id1 = m->Subscribe(p1.GetFuture(), [&callCount1](auto&&) { ++callCount1; } ); 
-        size_t callCount2 = 0; 
-        auto id2 = m->Subscribe(p2.GetFuture(), [&callCount2](auto&&) { ++callCount2; } ); 
-        size_t callCount3 = 0; 
-        auto id3 = m->Subscribe(p3.GetFuture(), [&callCount3](auto&&) { ++callCount3; } ); 
-        size_t callCount4 = 0; 
-        auto id4 = m->Subscribe(p2.GetFuture(), [&callCount4](auto&&) { ++callCount4; } ); 
-        size_t callCount5 = 0; 
-        auto id5 = m->Subscribe(p1.GetFuture(), [&callCount5](auto&&) { ++callCount5; } ); 
- 
-        UNIT_ASSERT(id1.has_value()); 
-        UNIT_ASSERT(id2.has_value()); 
-        UNIT_ASSERT(id3.has_value()); 
-        UNIT_ASSERT(id4.has_value()); 
-        UNIT_ASSERT(id5.has_value()); 
-        UNIT_ASSERT_EQUAL(callCount1, 0); 
-        UNIT_ASSERT_EQUAL(callCount2, 0); 
-        UNIT_ASSERT_EQUAL(callCount3, 0); 
-        UNIT_ASSERT_EQUAL(callCount4, 0); 
-        UNIT_ASSERT_EQUAL(callCount5, 0); 
- 
-        m->Unsubscribe(id1.value()); 
-        p1.SetValue(-1); 
-        UNIT_ASSERT_EQUAL(callCount1, 0); 
-        UNIT_ASSERT_EQUAL(callCount2, 0); 
-        UNIT_ASSERT_EQUAL(callCount3, 0); 
-        UNIT_ASSERT_EQUAL(callCount4, 0); 
-        UNIT_ASSERT_EQUAL(callCount5, 1); 
- 
-        m->Unsubscribe(id4.value()); 
-        p2.SetValue(23); 
-        UNIT_ASSERT_EQUAL(callCount1, 0); 
-        UNIT_ASSERT_EQUAL(callCount2, 1); 
-        UNIT_ASSERT_EQUAL(callCount3, 0); 
-        UNIT_ASSERT_EQUAL(callCount4, 0); 
-        UNIT_ASSERT_EQUAL(callCount5, 1); 
- 
-        p3.SetValue(100500); 
-        UNIT_ASSERT_EQUAL(callCount1, 0); 
-        UNIT_ASSERT_EQUAL(callCount2, 1); 
-        UNIT_ASSERT_EQUAL(callCount3, 1); 
-        UNIT_ASSERT_EQUAL(callCount4, 0); 
-        UNIT_ASSERT_EQUAL(callCount5, 1); 
-    } 
- 
-    Y_UNIT_TEST(TestBulkSubscribeManyUnsignaled) { 
-        auto m = TSubscriptionManager::NewInstance(); 
-        auto p1 = NewPromise<int>(); 
-        auto p2 = NewPromise<int>(); 
- 
-        size_t callCount = 0; 
-        auto ids = m->Subscribe({ p1.GetFuture(), p2.GetFuture(), p1.GetFuture() }, [&callCount](auto&&) { ++callCount; }); 
- 
-        UNIT_ASSERT_EQUAL(ids.size(), 3); 
-        UNIT_ASSERT_UNEQUAL(ids[0], ids[1]); 
-        UNIT_ASSERT_UNEQUAL(ids[1], ids[2]); 
-        UNIT_ASSERT_UNEQUAL(ids[2], ids[0]); 
-        UNIT_ASSERT_EQUAL(callCount, 0); 
- 
-        p1.SetValue(33); 
-        UNIT_ASSERT_EQUAL(callCount, 2); 
- 
-        p2.SetValue(111); 
-        UNIT_ASSERT_EQUAL(callCount, 3); 
-    } 
- 
-    Y_UNIT_TEST(TestBulkSubscribeManySignaledNoRevert) { 
-        auto m = TSubscriptionManager::NewInstance(); 
-        auto f1 = MakeFuture(0); 
-        auto f2 = MakeFuture(1); 
- 
-        size_t callCount = 0; 
-        auto ids = m->Subscribe({ f1, f2, f1 }, [&callCount](auto&&) { ++callCount; }); 
- 
-        UNIT_ASSERT_EQUAL(ids.size(), 3); 
-        UNIT_ASSERT_UNEQUAL(ids[0], ids[1]); 
-        UNIT_ASSERT_UNEQUAL(ids[1], ids[2]); 
-        UNIT_ASSERT_UNEQUAL(ids[2], ids[0]); 
-        UNIT_ASSERT_EQUAL(callCount, 3); 
-    } 
- 
-    Y_UNIT_TEST(TestBulkSubscribeManySignaledRevert) { 
-        auto m = TSubscriptionManager::NewInstance(); 
-        auto f1 = MakeFuture(0); 
-        auto f2 = MakeFuture(1); 
- 
-        size_t callCount = 0; 
-        auto ids = m->Subscribe({ f1, f2, f1 }, [&callCount](auto&&) { ++callCount; }, true); 
- 
-        UNIT_ASSERT(ids.empty()); 
-        UNIT_ASSERT_EQUAL(callCount, 1); 
-    } 
- 
-    Y_UNIT_TEST(TestBulkSubscribeManyMixedNoRevert) { 
-        auto m = TSubscriptionManager::NewInstance(); 
-        auto p1 = NewPromise<int>(); 
-        auto p2 = NewPromise<int>(); 
-        auto f = MakeFuture(42); 
- 
-        size_t callCount = 0; 
-        auto ids = m->Subscribe({ p1.GetFuture(), p2.GetFuture(), f }, [&callCount](auto&&) { ++callCount; } ); 
- 
-        UNIT_ASSERT_EQUAL(ids.size(), 3); 
-        UNIT_ASSERT_UNEQUAL(ids[0], ids[1]); 
-        UNIT_ASSERT_UNEQUAL(ids[1], ids[2]); 
-        UNIT_ASSERT_UNEQUAL(ids[2], ids[0]); 
-        UNIT_ASSERT_EQUAL(callCount, 1); 
- 
-        p1.SetValue(45); 
-        UNIT_ASSERT_EQUAL(callCount, 2); 
- 
-        p2.SetValue(-7); 
-        UNIT_ASSERT_EQUAL(callCount, 3); 
-    } 
- 
-    Y_UNIT_TEST(TestBulkSubscribeManyMixedRevert) { 
-        auto m = TSubscriptionManager::NewInstance(); 
-        auto p1 = NewPromise(); 
-        auto p2 = NewPromise(); 
-        auto f = MakeFuture(); 
- 
-        size_t callCount = 0; 
-        auto ids = m->Subscribe({ p1.GetFuture(), f, p2.GetFuture() }, [&callCount](auto&&) { ++callCount; }, true); 
- 
-        UNIT_ASSERT(ids.empty()); 
-        UNIT_ASSERT_EQUAL(callCount, 1); 
- 
-        p1.SetValue(); 
-        p2.SetValue(); 
-        UNIT_ASSERT_EQUAL(callCount, 1); 
-    } 
- 
-    Y_UNIT_TEST(TestBulkSubscribeUnsubscribeMany) { 
-        auto m = TSubscriptionManager::NewInstance(); 
-        auto p1 = NewPromise<int>(); 
-        auto p2 = NewPromise<int>(); 
-        auto p3 = NewPromise<int>(); 
- 
-        size_t callCount = 0; 
-        auto ids = m->Subscribe( 
-                        TVector<TFuture<int>>{ p1.GetFuture(), p2.GetFuture(), p3.GetFuture(), p2.GetFuture(), p1.GetFuture() } 
-                        , [&callCount](auto&&) { ++callCount; } ); 
- 
-        UNIT_ASSERT_EQUAL(ids.size(), 5); 
-        UNIT_ASSERT_EQUAL(callCount, 0); 
- 
-        m->Unsubscribe(TVector<TSubscriptionId>{ ids[0], ids[3] }); 
-        UNIT_ASSERT_EQUAL(callCount, 0); 
- 
-        p1.SetValue(-1); 
-        UNIT_ASSERT_EQUAL(callCount, 1); 
- 
-        p2.SetValue(23); 
-        UNIT_ASSERT_EQUAL(callCount, 2); 
- 
-        p3.SetValue(100500); 
-        UNIT_ASSERT_EQUAL(callCount, 3); 
-    } 
-} 
+
+using namespace NThreading;
+
+Y_UNIT_TEST_SUITE(TSubscriptionManagerTest) {
+
+    Y_UNIT_TEST(TestSubscribeUnsignaled) {
+        auto m = TSubscriptionManager::NewInstance();
+        auto p = NewPromise();
+
+        size_t callCount = 0;
+        auto id = m->Subscribe(p.GetFuture(), [&callCount](auto&&) { ++callCount; } );
+        UNIT_ASSERT(id.has_value());
+        UNIT_ASSERT_EQUAL(callCount, 0);
+
+        p.SetValue();
+        UNIT_ASSERT_EQUAL(callCount, 1);
+    }
+
+    Y_UNIT_TEST(TestSubscribeSignaled) {
+        auto m = TSubscriptionManager::NewInstance();
+        auto f = MakeFuture();
+
+        size_t callCount = 0;
+        auto id = m->Subscribe(f, [&callCount](auto&&) { ++callCount; } );
+        UNIT_ASSERT(!id.has_value());
+        UNIT_ASSERT_EQUAL(callCount, 1);
+    }
+
+    Y_UNIT_TEST(TestSubscribeUnsignaledAndSignaled) {
+        auto m = TSubscriptionManager::NewInstance();
+        auto p = NewPromise();
+
+        size_t callCount1 = 0;
+        auto id1 = m->Subscribe(p.GetFuture(), [&callCount1](auto&&) { ++callCount1; } );
+        UNIT_ASSERT(id1.has_value());
+        UNIT_ASSERT_EQUAL(callCount1, 0);
+
+        p.SetValue();
+        UNIT_ASSERT_EQUAL(callCount1, 1);
+
+        size_t callCount2 = 0;
+        auto id2 = m->Subscribe(p.GetFuture(), [&callCount2](auto&&) { ++callCount2; } );
+        UNIT_ASSERT(!id2.has_value());
+        UNIT_ASSERT_EQUAL(callCount2, 1);
+        UNIT_ASSERT_EQUAL(callCount1, 1);
+    }
+
+    Y_UNIT_TEST(TestSubscribeUnsubscribeUnsignaled) {
+        auto m = TSubscriptionManager::NewInstance();
+        auto p = NewPromise();
+
+        size_t callCount = 0;
+        auto id = m->Subscribe(p.GetFuture(), [&callCount](auto&&) { ++callCount; } );
+        UNIT_ASSERT(id.has_value());
+        UNIT_ASSERT_EQUAL(callCount, 0);
+
+        m->Unsubscribe(id.value());
+
+        p.SetValue();
+        UNIT_ASSERT_EQUAL(callCount, 0);
+    }
+
+    Y_UNIT_TEST(TestSubscribeUnsignaledUnsubscribeSignaled) {
+        auto m = TSubscriptionManager::NewInstance();
+        auto p = NewPromise();
+
+        size_t callCount = 0;
+        auto id = m->Subscribe(p.GetFuture(), [&callCount](auto&&) { ++callCount; } );
+        UNIT_ASSERT(id.has_value());
+        UNIT_ASSERT_EQUAL(callCount, 0);
+
+        p.SetValue();
+        UNIT_ASSERT_EQUAL(callCount, 1);
+
+        m->Unsubscribe(id.value());
+        UNIT_ASSERT_EQUAL(callCount, 1);
+    }
+
+    Y_UNIT_TEST(TestUnsubscribeTwice) {
+        auto m = TSubscriptionManager::NewInstance();
+        auto p = NewPromise();
+
+        size_t callCount = 0;
+        auto id = m->Subscribe(p.GetFuture(), [&callCount](auto&&) { ++callCount; } );
+        UNIT_ASSERT(id.has_value());
+        UNIT_ASSERT_EQUAL(callCount, 0);
+
+        m->Unsubscribe(id.value());
+        UNIT_ASSERT_EQUAL(callCount, 0);
+        m->Unsubscribe(id.value());
+        UNIT_ASSERT_EQUAL(callCount, 0);
+
+        p.SetValue();
+        UNIT_ASSERT_EQUAL(callCount, 0);
+    }
+
+    Y_UNIT_TEST(TestSubscribeOneUnsignaledManyTimes) {
+        auto m = TSubscriptionManager::NewInstance();
+        auto p = NewPromise();
+
+        size_t callCount1 = 0;
+        auto id1 = m->Subscribe(p.GetFuture(), [&callCount1](auto&&) { ++callCount1; } );
+        size_t callCount2 = 0;
+        auto id2 = m->Subscribe(p.GetFuture(), [&callCount2](auto&&) { ++callCount2; } );
+        size_t callCount3 = 0;
+        auto id3 = m->Subscribe(p.GetFuture(), [&callCount3](auto&&) { ++callCount3; } );
+
+        UNIT_ASSERT(id1.has_value());
+        UNIT_ASSERT(id2.has_value());
+        UNIT_ASSERT(id3.has_value());
+        UNIT_ASSERT_UNEQUAL(id1.value(), id2.value());
+        UNIT_ASSERT_UNEQUAL(id2.value(), id3.value());
+        UNIT_ASSERT_UNEQUAL(id3.value(), id1.value());
+        UNIT_ASSERT_EQUAL(callCount1, 0);
+        UNIT_ASSERT_EQUAL(callCount2, 0);
+        UNIT_ASSERT_EQUAL(callCount3, 0);
+
+        p.SetValue();
+        UNIT_ASSERT_EQUAL(callCount1, 1);
+        UNIT_ASSERT_EQUAL(callCount2, 1);
+        UNIT_ASSERT_EQUAL(callCount3, 1);
+    }
+
+    Y_UNIT_TEST(TestSubscribeOneSignaledManyTimes) {
+        auto m = TSubscriptionManager::NewInstance();
+        auto f = MakeFuture();
+
+        size_t callCount1 = 0;
+        auto id1 = m->Subscribe(f, [&callCount1](auto&&) { ++callCount1; } );
+        size_t callCount2 = 0;
+        auto id2 = m->Subscribe(f, [&callCount2](auto&&) { ++callCount2; } );
+        size_t callCount3 = 0;
+        auto id3 = m->Subscribe(f, [&callCount3](auto&&) { ++callCount3; } );
+
+        UNIT_ASSERT(!id1.has_value());
+        UNIT_ASSERT(!id2.has_value());
+        UNIT_ASSERT(!id3.has_value());
+        UNIT_ASSERT_EQUAL(callCount1, 1);
+        UNIT_ASSERT_EQUAL(callCount2, 1);
+        UNIT_ASSERT_EQUAL(callCount3, 1);
+    }
+
+    Y_UNIT_TEST(TestSubscribeUnsubscribeOneUnsignaledManyTimes) {
+        auto m = TSubscriptionManager::NewInstance();
+        auto p = NewPromise();
+
+        size_t callCount1 = 0;
+        auto id1 = m->Subscribe(p.GetFuture(), [&callCount1](auto&&) { ++callCount1; } );
+        size_t callCount2 = 0;
+        auto id2 = m->Subscribe(p.GetFuture(), [&callCount2](auto&&) { ++callCount2; } );
+        size_t callCount3 = 0;
+        auto id3 = m->Subscribe(p.GetFuture(), [&callCount3](auto&&) { ++callCount3; } );
+        size_t callCount4 = 0;
+        auto id4 = m->Subscribe(p.GetFuture(), [&callCount4](auto&&) { ++callCount4; } );
+
+        UNIT_ASSERT(id1.has_value());
+        UNIT_ASSERT(id2.has_value());
+        UNIT_ASSERT(id3.has_value());
+        UNIT_ASSERT(id4.has_value());
+        UNIT_ASSERT_EQUAL(callCount1, 0);
+        UNIT_ASSERT_EQUAL(callCount2, 0);
+        UNIT_ASSERT_EQUAL(callCount3, 0);
+        UNIT_ASSERT_EQUAL(callCount4, 0);
+
+        m->Unsubscribe(id3.value());
+        m->Unsubscribe(id1.value());
+        UNIT_ASSERT_EQUAL(callCount1, 0);
+        UNIT_ASSERT_EQUAL(callCount2, 0);
+        UNIT_ASSERT_EQUAL(callCount3, 0);
+        UNIT_ASSERT_EQUAL(callCount4, 0);
+
+        p.SetValue();
+        UNIT_ASSERT_EQUAL(callCount1, 0);
+        UNIT_ASSERT_EQUAL(callCount2, 1);
+        UNIT_ASSERT_EQUAL(callCount3, 0);
+        UNIT_ASSERT_EQUAL(callCount4, 1);
+    }
+
+    Y_UNIT_TEST(TestSubscribeManyUnsignaled) {
+        auto m = TSubscriptionManager::NewInstance();
+        auto p1 = NewPromise<int>();
+        auto p2 = NewPromise<int>();
+
+        size_t callCount1 = 0;
+        auto id1 = m->Subscribe(p1.GetFuture(), [&callCount1](auto&&) { ++callCount1; } );
+        size_t callCount2 = 0;
+        auto id2 = m->Subscribe(p2.GetFuture(), [&callCount2](auto&&) { ++callCount2; } );
+        size_t callCount3 = 0;
+        auto id3 = m->Subscribe(p1.GetFuture(), [&callCount3](auto&&) { ++callCount3; } );
+
+        UNIT_ASSERT(id1.has_value());
+        UNIT_ASSERT(id2.has_value());
+        UNIT_ASSERT(id3.has_value());
+        UNIT_ASSERT_UNEQUAL(id1.value(), id2.value());
+        UNIT_ASSERT_UNEQUAL(id2.value(), id3.value());
+        UNIT_ASSERT_UNEQUAL(id3.value(), id1.value());
+        UNIT_ASSERT_EQUAL(callCount1, 0);
+        UNIT_ASSERT_EQUAL(callCount2, 0);
+        UNIT_ASSERT_EQUAL(callCount3, 0);
+
+        p1.SetValue(33);
+        UNIT_ASSERT_EQUAL(callCount1, 1);
+        UNIT_ASSERT_EQUAL(callCount2, 0);
+        UNIT_ASSERT_EQUAL(callCount3, 1);
+
+        p2.SetValue(111);
+        UNIT_ASSERT_EQUAL(callCount1, 1);
+        UNIT_ASSERT_EQUAL(callCount2, 1);
+        UNIT_ASSERT_EQUAL(callCount3, 1);
+    }
+
+    Y_UNIT_TEST(TestSubscribeManySignaled) {
+        auto m = TSubscriptionManager::NewInstance();
+        auto f1 = MakeFuture(0);
+        auto f2 = MakeFuture(1);
+
+        size_t callCount1 = 0;
+        auto id1 = m->Subscribe(f1, [&callCount1](auto&&) { ++callCount1; } );
+        size_t callCount2 = 0;
+        auto id2 = m->Subscribe(f2, [&callCount2](auto&&) { ++callCount2; } );
+        size_t callCount3 = 0;
+        auto id3 = m->Subscribe(f2, [&callCount3](auto&&) { ++callCount3; } );
+
+        UNIT_ASSERT(!id1.has_value());
+        UNIT_ASSERT(!id2.has_value());
+        UNIT_ASSERT(!id3.has_value());
+        UNIT_ASSERT_EQUAL(callCount1, 1);
+        UNIT_ASSERT_EQUAL(callCount2, 1);
+        UNIT_ASSERT_EQUAL(callCount3, 1);
+    }
+
+    Y_UNIT_TEST(TestSubscribeManyMixed) {
+        auto m = TSubscriptionManager::NewInstance();
+        auto p1 = NewPromise<int>();
+        auto p2 = NewPromise<int>();
+        auto f = MakeFuture(42);
+
+        size_t callCount1 = 0;
+        auto id1 = m->Subscribe(p1.GetFuture(), [&callCount1](auto&&) { ++callCount1; } );
+        size_t callCount2 = 0;
+        auto id2 = m->Subscribe(p2.GetFuture(), [&callCount2](auto&&) { ++callCount2; } );
+        size_t callCount3 = 0;
+        auto id3 = m->Subscribe(f, [&callCount3](auto&&) { ++callCount3; } );
+
+        UNIT_ASSERT(id1.has_value());
+        UNIT_ASSERT(id2.has_value());
+        UNIT_ASSERT(!id3.has_value());
+        UNIT_ASSERT_UNEQUAL(id1.value(), id2.value());
+        UNIT_ASSERT_EQUAL(callCount1, 0);
+        UNIT_ASSERT_EQUAL(callCount2, 0);
+        UNIT_ASSERT_EQUAL(callCount3, 1);
+
+        p1.SetValue(45);
+        UNIT_ASSERT_EQUAL(callCount1, 1);
+        UNIT_ASSERT_EQUAL(callCount2, 0);
+        UNIT_ASSERT_EQUAL(callCount3, 1);
+
+        p2.SetValue(-7);
+        UNIT_ASSERT_EQUAL(callCount1, 1);
+        UNIT_ASSERT_EQUAL(callCount2, 1);
+        UNIT_ASSERT_EQUAL(callCount3, 1);
+    }
+
+    Y_UNIT_TEST(TestSubscribeUnsubscribeMany) {
+        auto m = TSubscriptionManager::NewInstance();
+        auto p1 = NewPromise<int>();
+        auto p2 = NewPromise<int>();
+        auto p3 = NewPromise<int>();
+
+        size_t callCount1 = 0;
+        auto id1 = m->Subscribe(p1.GetFuture(), [&callCount1](auto&&) { ++callCount1; } );
+        size_t callCount2 = 0;
+        auto id2 = m->Subscribe(p2.GetFuture(), [&callCount2](auto&&) { ++callCount2; } );
+        size_t callCount3 = 0;
+        auto id3 = m->Subscribe(p3.GetFuture(), [&callCount3](auto&&) { ++callCount3; } );
+        size_t callCount4 = 0;
+        auto id4 = m->Subscribe(p2.GetFuture(), [&callCount4](auto&&) { ++callCount4; } );
+        size_t callCount5 = 0;
+        auto id5 = m->Subscribe(p1.GetFuture(), [&callCount5](auto&&) { ++callCount5; } );
+
+        UNIT_ASSERT(id1.has_value());
+        UNIT_ASSERT(id2.has_value());
+        UNIT_ASSERT(id3.has_value());
+        UNIT_ASSERT(id4.has_value());
+        UNIT_ASSERT(id5.has_value());
+        UNIT_ASSERT_EQUAL(callCount1, 0);
+        UNIT_ASSERT_EQUAL(callCount2, 0);
+        UNIT_ASSERT_EQUAL(callCount3, 0);
+        UNIT_ASSERT_EQUAL(callCount4, 0);
+        UNIT_ASSERT_EQUAL(callCount5, 0);
+
+        m->Unsubscribe(id1.value());
+        p1.SetValue(-1);
+        UNIT_ASSERT_EQUAL(callCount1, 0);
+        UNIT_ASSERT_EQUAL(callCount2, 0);
+        UNIT_ASSERT_EQUAL(callCount3, 0);
+        UNIT_ASSERT_EQUAL(callCount4, 0);
+        UNIT_ASSERT_EQUAL(callCount5, 1);
+
+        m->Unsubscribe(id4.value());
+        p2.SetValue(23);
+        UNIT_ASSERT_EQUAL(callCount1, 0);
+        UNIT_ASSERT_EQUAL(callCount2, 1);
+        UNIT_ASSERT_EQUAL(callCount3, 0);
+        UNIT_ASSERT_EQUAL(callCount4, 0);
+        UNIT_ASSERT_EQUAL(callCount5, 1);
+
+        p3.SetValue(100500);
+        UNIT_ASSERT_EQUAL(callCount1, 0);
+        UNIT_ASSERT_EQUAL(callCount2, 1);
+        UNIT_ASSERT_EQUAL(callCount3, 1);
+        UNIT_ASSERT_EQUAL(callCount4, 0);
+        UNIT_ASSERT_EQUAL(callCount5, 1);
+    }
+
+    Y_UNIT_TEST(TestBulkSubscribeManyUnsignaled) {
+        auto m = TSubscriptionManager::NewInstance();
+        auto p1 = NewPromise<int>();
+        auto p2 = NewPromise<int>();
+
+        size_t callCount = 0;
+        auto ids = m->Subscribe({ p1.GetFuture(), p2.GetFuture(), p1.GetFuture() }, [&callCount](auto&&) { ++callCount; });
+
+        UNIT_ASSERT_EQUAL(ids.size(), 3);
+        UNIT_ASSERT_UNEQUAL(ids[0], ids[1]);
+        UNIT_ASSERT_UNEQUAL(ids[1], ids[2]);
+        UNIT_ASSERT_UNEQUAL(ids[2], ids[0]);
+        UNIT_ASSERT_EQUAL(callCount, 0);
+
+        p1.SetValue(33);
+        UNIT_ASSERT_EQUAL(callCount, 2);
+
+        p2.SetValue(111);
+        UNIT_ASSERT_EQUAL(callCount, 3);
+    }
+
+    Y_UNIT_TEST(TestBulkSubscribeManySignaledNoRevert) {
+        auto m = TSubscriptionManager::NewInstance();
+        auto f1 = MakeFuture(0);
+        auto f2 = MakeFuture(1);
+
+        size_t callCount = 0;
+        auto ids = m->Subscribe({ f1, f2, f1 }, [&callCount](auto&&) { ++callCount; });
+
+        UNIT_ASSERT_EQUAL(ids.size(), 3);
+        UNIT_ASSERT_UNEQUAL(ids[0], ids[1]);
+        UNIT_ASSERT_UNEQUAL(ids[1], ids[2]);
+        UNIT_ASSERT_UNEQUAL(ids[2], ids[0]);
+        UNIT_ASSERT_EQUAL(callCount, 3);
+    }
+
+    Y_UNIT_TEST(TestBulkSubscribeManySignaledRevert) {
+        auto m = TSubscriptionManager::NewInstance();
+        auto f1 = MakeFuture(0);
+        auto f2 = MakeFuture(1);
+
+        size_t callCount = 0;
+        auto ids = m->Subscribe({ f1, f2, f1 }, [&callCount](auto&&) { ++callCount; }, true);
+
+        UNIT_ASSERT(ids.empty());
+        UNIT_ASSERT_EQUAL(callCount, 1);
+    }
+
+    Y_UNIT_TEST(TestBulkSubscribeManyMixedNoRevert) {
+        auto m = TSubscriptionManager::NewInstance();
+        auto p1 = NewPromise<int>();
+        auto p2 = NewPromise<int>();
+        auto f = MakeFuture(42);
+
+        size_t callCount = 0;
+        auto ids = m->Subscribe({ p1.GetFuture(), p2.GetFuture(), f }, [&callCount](auto&&) { ++callCount; } );
+
+        UNIT_ASSERT_EQUAL(ids.size(), 3);
+        UNIT_ASSERT_UNEQUAL(ids[0], ids[1]);
+        UNIT_ASSERT_UNEQUAL(ids[1], ids[2]);
+        UNIT_ASSERT_UNEQUAL(ids[2], ids[0]);
+        UNIT_ASSERT_EQUAL(callCount, 1);
+
+        p1.SetValue(45);
+        UNIT_ASSERT_EQUAL(callCount, 2);
+
+        p2.SetValue(-7);
+        UNIT_ASSERT_EQUAL(callCount, 3);
+    }
+
+    Y_UNIT_TEST(TestBulkSubscribeManyMixedRevert) {
+        auto m = TSubscriptionManager::NewInstance();
+        auto p1 = NewPromise();
+        auto p2 = NewPromise();
+        auto f = MakeFuture();
+
+        size_t callCount = 0;
+        auto ids = m->Subscribe({ p1.GetFuture(), f, p2.GetFuture() }, [&callCount](auto&&) { ++callCount; }, true);
+
+        UNIT_ASSERT(ids.empty());
+        UNIT_ASSERT_EQUAL(callCount, 1);
+
+        p1.SetValue();
+        p2.SetValue();
+        UNIT_ASSERT_EQUAL(callCount, 1);
+    }
+
+    Y_UNIT_TEST(TestBulkSubscribeUnsubscribeMany) {
+        auto m = TSubscriptionManager::NewInstance();
+        auto p1 = NewPromise<int>();
+        auto p2 = NewPromise<int>();
+        auto p3 = NewPromise<int>();
+
+        size_t callCount = 0;
+        auto ids = m->Subscribe(
+                        TVector<TFuture<int>>{ p1.GetFuture(), p2.GetFuture(), p3.GetFuture(), p2.GetFuture(), p1.GetFuture() }
+                        , [&callCount](auto&&) { ++callCount; } );
+
+        UNIT_ASSERT_EQUAL(ids.size(), 5);
+        UNIT_ASSERT_EQUAL(callCount, 0);
+
+        m->Unsubscribe(TVector<TSubscriptionId>{ ids[0], ids[3] });
+        UNIT_ASSERT_EQUAL(callCount, 0);
+
+        p1.SetValue(-1);
+        UNIT_ASSERT_EQUAL(callCount, 1);
+
+        p2.SetValue(23);
+        UNIT_ASSERT_EQUAL(callCount, 2);
+
+        p3.SetValue(100500);
+        UNIT_ASSERT_EQUAL(callCount, 3);
+    }
+}

+ 17 - 17
library/cpp/threading/future/subscription/ut/ya.make

@@ -1,17 +1,17 @@
-UNITTEST_FOR(library/cpp/threading/future/subscription) 
- 
-OWNER( 
-    g:kwyt 
-    g:rtmr 
-    ishfb 
-) 
- 
-SRCS( 
-    subscription_ut.cpp 
-    wait_all_ut.cpp 
-    wait_all_or_exception_ut.cpp 
-    wait_any_ut.cpp 
-    wait_ut_common.cpp 
-) 
- 
-END() 
+UNITTEST_FOR(library/cpp/threading/future/subscription)
+
+OWNER(
+    g:kwyt
+    g:rtmr
+    ishfb
+)
+
+SRCS(
+    subscription_ut.cpp
+    wait_all_ut.cpp
+    wait_all_or_exception_ut.cpp
+    wait_any_ut.cpp
+    wait_ut_common.cpp
+)
+
+END()

+ 119 - 119
library/cpp/threading/future/subscription/wait.h

@@ -1,119 +1,119 @@
-#pragma once 
- 
-#include "subscription.h" 
- 
-#include <util/generic/vector.h> 
-#include <util/generic/yexception.h> 
-#include <util/system/spinlock.h> 
- 
- 
-#include <initializer_list> 
- 
-namespace NThreading::NPrivate { 
- 
-template <typename TDerived> 
-class TWait : public TThrRefBase { 
-private: 
-    TSubscriptionManagerPtr Manager; 
-    TVector<TSubscriptionId> Subscriptions; 
-    bool Unsubscribed = false; 
- 
-protected: 
-    TAdaptiveLock Lock; 
-    TPromise<void> Promise; 
- 
-public: 
-    template <typename TFutures, typename TCallbackExecutor> 
-    static TFuture<void> Make(TFutures const& futures, TSubscriptionManagerPtr manager, TCallbackExecutor&& executor) { 
-        TIntrusivePtr<TDerived> w(new TDerived(std::move(manager))); 
-        w->Subscribe(futures, std::forward<TCallbackExecutor>(executor)); 
-        return w->Promise.GetFuture(); 
-    } 
- 
-protected: 
-    TWait(TSubscriptionManagerPtr manager) 
-        : Manager(std::move(manager)) 
-        , Subscriptions() 
-        , Unsubscribed(false) 
-        , Lock() 
-        , Promise(NewPromise()) 
-    { 
-        Y_ENSURE(Manager != nullptr); 
-    } 
- 
-protected: 
-    //! Unsubscribes all existing subscriptions 
-    /** Lock should be acquired! 
-    **/ 
-    void Unsubscribe() noexcept { 
-        if (Unsubscribed) { 
-            return; 
-        } 
-        Unsubscribe(Subscriptions); 
-        Subscriptions.clear(); 
-    } 
- 
-private: 
-    //! Performs a subscription to the given futures 
-    /** Lock should not be acquired! 
-        @param future - The futures to subscribe to 
-        @param callback - The callback to call for each future 
-    **/ 
-    template <typename TFutures, typename TCallbackExecutor> 
-    void Subscribe(TFutures const& futures, TCallbackExecutor&& executor) { 
-        auto self = TIntrusivePtr<TDerived>(static_cast<TDerived*>(this)); 
-        self->BeforeSubscribe(futures); 
-        auto callback = [self = std::move(self)](const auto& future) mutable { 
-            self->Set(future); 
-        }; 
-        auto subscriptions = Manager->Subscribe(futures, callback, TDerived::RevertOnSignaled, std::forward<TCallbackExecutor>(executor)); 
-        if (subscriptions.empty()) { 
-            return; 
-        } 
-        with_lock (Lock) { 
-            if (Unsubscribed) { 
-                Unsubscribe(subscriptions); 
-            } else { 
-                Subscriptions = std::move(subscriptions); 
-            } 
-        } 
-    } 
- 
-    void Unsubscribe(TVector<TSubscriptionId>& subscriptions) noexcept { 
-        Manager->Unsubscribe(subscriptions); 
-        Unsubscribed = true; 
-    } 
-}; 
- 
-template <typename TWaiter, typename TFutures, typename TCallbackExecutor> 
-TFuture<void> Wait(TFutures const& futures, TSubscriptionManagerPtr manager, TCallbackExecutor&& executor) { 
-    switch (std::size(futures)) { 
-        case 0: 
-            return MakeFuture(); 
-        case 1: 
-            return std::begin(futures)->IgnoreResult(); 
-        default: 
-            return TWaiter::Make(futures, std::move(manager), std::forward<TCallbackExecutor>(executor)); 
-    } 
-} 
- 
-template <typename TWaiter, typename T, typename TCallbackExecutor> 
-TFuture<void> Wait(std::initializer_list<TFuture<T> const> futures, TSubscriptionManagerPtr manager, TCallbackExecutor&& executor) { 
-    switch (std::size(futures)) { 
-        case 0: 
-            return MakeFuture(); 
-        case 1: 
-            return std::begin(futures)->IgnoreResult(); 
-        default: 
-            return TWaiter::Make(futures, std::move(manager), std::forward<TCallbackExecutor>(executor)); 
-    } 
-} 
- 
- 
-template <typename TWaiter, typename T, typename TCallbackExecutor> 
-TFuture<void> Wait(TFuture<T> const& future1, TFuture<T> const& future2, TSubscriptionManagerPtr manager, TCallbackExecutor&& executor) { 
-    return TWaiter::Make(std::initializer_list<TFuture<T> const>({ future1, future2 }), std::move(manager) 
-                            , std::forward<TCallbackExecutor>(executor)); 
-} 
- 
-} 
+#pragma once
+
+#include "subscription.h"
+
+#include <util/generic/vector.h>
+#include <util/generic/yexception.h>
+#include <util/system/spinlock.h>
+
+
+#include <initializer_list>
+
+namespace NThreading::NPrivate {
+
+template <typename TDerived>
+class TWait : public TThrRefBase {
+private:
+    TSubscriptionManagerPtr Manager;
+    TVector<TSubscriptionId> Subscriptions;
+    bool Unsubscribed = false;
+
+protected:
+    TAdaptiveLock Lock;
+    TPromise<void> Promise;
+
+public:
+    template <typename TFutures, typename TCallbackExecutor>
+    static TFuture<void> Make(TFutures const& futures, TSubscriptionManagerPtr manager, TCallbackExecutor&& executor) {
+        TIntrusivePtr<TDerived> w(new TDerived(std::move(manager)));
+        w->Subscribe(futures, std::forward<TCallbackExecutor>(executor));
+        return w->Promise.GetFuture();
+    }
+
+protected:
+    TWait(TSubscriptionManagerPtr manager)
+        : Manager(std::move(manager))
+        , Subscriptions()
+        , Unsubscribed(false)
+        , Lock()
+        , Promise(NewPromise())
+    {
+        Y_ENSURE(Manager != nullptr);
+    }
+
+protected:
+    //! Unsubscribes all existing subscriptions
+    /** Lock should be acquired!
+    **/
+    void Unsubscribe() noexcept {
+        if (Unsubscribed) {
+            return;
+        }
+        Unsubscribe(Subscriptions);
+        Subscriptions.clear();
+    }
+
+private:
+    //! Performs a subscription to the given futures
+    /** Lock should not be acquired!
+        @param future - The futures to subscribe to
+        @param callback - The callback to call for each future
+    **/
+    template <typename TFutures, typename TCallbackExecutor>
+    void Subscribe(TFutures const& futures, TCallbackExecutor&& executor) {
+        auto self = TIntrusivePtr<TDerived>(static_cast<TDerived*>(this));
+        self->BeforeSubscribe(futures);
+        auto callback = [self = std::move(self)](const auto& future) mutable {
+            self->Set(future);
+        };
+        auto subscriptions = Manager->Subscribe(futures, callback, TDerived::RevertOnSignaled, std::forward<TCallbackExecutor>(executor));
+        if (subscriptions.empty()) {
+            return;
+        }
+        with_lock (Lock) {
+            if (Unsubscribed) {
+                Unsubscribe(subscriptions);
+            } else {
+                Subscriptions = std::move(subscriptions);
+            }
+        }
+    }
+
+    void Unsubscribe(TVector<TSubscriptionId>& subscriptions) noexcept {
+        Manager->Unsubscribe(subscriptions);
+        Unsubscribed = true;
+    }
+};
+
+template <typename TWaiter, typename TFutures, typename TCallbackExecutor>
+TFuture<void> Wait(TFutures const& futures, TSubscriptionManagerPtr manager, TCallbackExecutor&& executor) {
+    switch (std::size(futures)) {
+        case 0:
+            return MakeFuture();
+        case 1:
+            return std::begin(futures)->IgnoreResult();
+        default:
+            return TWaiter::Make(futures, std::move(manager), std::forward<TCallbackExecutor>(executor));
+    }
+}
+
+template <typename TWaiter, typename T, typename TCallbackExecutor>
+TFuture<void> Wait(std::initializer_list<TFuture<T> const> futures, TSubscriptionManagerPtr manager, TCallbackExecutor&& executor) {
+    switch (std::size(futures)) {
+        case 0:
+            return MakeFuture();
+        case 1:
+            return std::begin(futures)->IgnoreResult();
+        default:
+            return TWaiter::Make(futures, std::move(manager), std::forward<TCallbackExecutor>(executor));
+    }
+}
+
+
+template <typename TWaiter, typename T, typename TCallbackExecutor>
+TFuture<void> Wait(TFuture<T> const& future1, TFuture<T> const& future2, TSubscriptionManagerPtr manager, TCallbackExecutor&& executor) {
+    return TWaiter::Make(std::initializer_list<TFuture<T> const>({ future1, future2 }), std::move(manager)
+                            , std::forward<TCallbackExecutor>(executor));
+}
+
+}

Some files were not shown because too many files changed in this diff