Browse Source

Intermediate changes

robot-piglet 1 year ago
parent
commit
5e3cebce04

+ 3 - 3
library/cpp/yt/containers/unittests/sharded_set_ut.cpp

@@ -21,7 +21,7 @@ using TSet = TShardedSet<int, 16, TIntToShard>;
 
 ////////////////////////////////////////////////////////////////////////////////
 
-TEST(CompactSetTest, Insert)
+TEST(TShardedSetTest, Insert)
 {
     TSet set;
 
@@ -41,7 +41,7 @@ TEST(CompactSetTest, Insert)
     EXPECT_EQ(0u, set.count(4));
 }
 
-TEST(CompactSetTest, Erase)
+TEST(TShardedSetTest, Erase)
 {
     TSet set;
 
@@ -65,7 +65,7 @@ TEST(CompactSetTest, Erase)
     EXPECT_EQ(0u, set.count(8));
 }
 
-TEST(CompactSetTest, StressTest)
+TEST(TShardedSetTest, StressTest)
 {
     TSet set;
 

+ 6 - 0
yt/yt/core/bus/tcp/connection.cpp

@@ -793,6 +793,12 @@ void TTcpConnection::UnsubscribeTerminated(const TCallback<void(const TError&)>&
 
 void TTcpConnection::OnEvent(EPollControl control)
 {
+    auto multiplexingBand = MultiplexingBand_.load();
+    if (multiplexingBand != ActualMultiplexingBand_) {
+        Poller_->SetExecutionPool(this, FormatEnum(multiplexingBand));
+        ActualMultiplexingBand_ = multiplexingBand;
+    }
+
     EPollControl action;
     {
         auto rawPendingControl = PendingControl_.load(std::memory_order::acquire);

+ 2 - 0
yt/yt/core/bus/tcp/connection.h

@@ -224,6 +224,8 @@ private:
 
     std::atomic<EMultiplexingBand> MultiplexingBand_ = EMultiplexingBand::Default;
 
+    EMultiplexingBand ActualMultiplexingBand_ = EMultiplexingBand::Default;
+
     TAtomicObject<TError> Error_;
 
     NNet::IAsyncDialerSessionPtr DialerSession_;

+ 4 - 1
yt/yt/core/concurrency/poller.h

@@ -82,7 +82,10 @@ struct IPoller
 
     //! Tries to register a pollable entity but does not arm the poller yet.
     //! Returns |false| if the poller is already shutting down.
-    virtual bool TryRegister(const IPollablePtr& pollable) = 0;
+    virtual bool TryRegister(const IPollablePtr& pollable, TString poolName = "default") = 0;
+
+    //! Method must be called inside OnEvent.
+    virtual void SetExecutionPool(const IPollablePtr& pollable, TString poolName) = 0;
 
     //! Unregisters the previously registered entity.
     /*!

+ 86 - 72
yt/yt/core/concurrency/thread_pool_poller.cpp

@@ -1,9 +1,6 @@
-#include "thread_pool.h"
 #include "poller.h"
 #include "thread_pool_poller.h"
 #include "private.h"
-#include "profiling_helpers.h"
-#include "scheduler_thread.h"
 #include "two_level_fair_share_thread_pool.h"
 #include "new_fair_share_thread_pool.h"
 
@@ -13,6 +10,8 @@
 
 #include <yt/yt/core/profiling/tscp.h>
 
+#include <yt/yt/core/threading/thread.h>
+
 #include <library/cpp/yt/threading/notification_handle.h>
 
 #include <library/cpp/yt/memory/ref_tracked.h>
@@ -40,10 +39,16 @@ class TThreadPoolPoller;
 
 namespace {
 
+DEFINE_ENUM(EFinishResult,
+    (None)
+    (Repeat)
+    (Shutdown)
+);
+
 class TCookieState
 {
 public:
-    // AquireControl is called from poller thread.
+    // AquireControl is called from poller thread and from Retry.
     bool AquireControl(ui32 control)
     {
         auto currentState = State_.load();
@@ -55,19 +60,17 @@ public:
             if ((static_cast<ui32>(currentState) & control) == control) {
                 return false;
             }
-        } while (!State_.compare_exchange_weak(currentState, (currentState | static_cast<ui64>(control)) + RefValue));
+        } while (!State_.compare_exchange_weak(currentState, currentState | static_cast<ui64>(control) | RunningFlag));
 
-        return true;
+        return !(currentState & RunningFlag);
     }
 
-    void ResetControl(ui32 control)
+    // Resets control and returns previous value.
+    ui32 ResetControl()
     {
         auto currentState = State_.load();
-        do {
-            if (!(static_cast<ui32>(currentState) & control)) {
-                break;
-            }
-        } while (!State_.compare_exchange_weak(currentState, currentState & ~static_cast<ui64>(control)));
+        while (!State_.compare_exchange_weak(currentState, currentState & (UnregisterFlag | RunningFlag)));
+        return static_cast<ui32>(currentState);
     }
 
     // Returns destroy flag.
@@ -81,29 +84,36 @@ public:
             }
         } while (!State_.compare_exchange_weak(currentState, currentState | UnregisterFlag));
 
-        // No refs.
-        return currentState == 0;
+        return !(currentState & RunningFlag);
     }
 
-    // Returns destroy flag.
-    bool ReleaseRef()
+    EFinishResult Finish()
     {
-        auto prevState = State_.fetch_sub(RefValue);
+        auto currentState = State_.load();
 
-        YT_VERIFY(prevState >= RefValue);
-        auto currentState = prevState - RefValue;
-        if ((currentState >> ControlShift) == 1) {
-            // Verify that control flags are empty when there are no refs and unregister flag is set.
-            YT_VERIFY(static_cast<ui32>(currentState) == 0);
-        }
+        YT_VERIFY(currentState & RunningFlag);
+
+        do {
+            if (currentState & UnregisterFlag) {
+                // Run destroy.
+                return EFinishResult::Shutdown;
+            }
+
+            if (currentState & ~(UnregisterFlag | RunningFlag)) {
+                // Has state. Retry.
+                return EFinishResult::Repeat;
+            }
+
+        } while (!State_.compare_exchange_weak(currentState, currentState & ~RunningFlag));
 
-        return prevState == (RefValue | UnregisterFlag);
+        return EFinishResult::None;
     }
 
 private:
     static constexpr auto ControlShift = sizeof(ui32) * 8;
     static constexpr ui64 UnregisterFlag = 1ULL << ControlShift;
-    static constexpr ui64 RefValue = UnregisterFlag * 2;
+    static constexpr ui64 RunningFlag = 1ULL << (ControlShift + 1);
+
     // No contention expected when accessing this atomic variable.
     // So we can safely (regarding to performance) use CAS.
     std::atomic<ui64> State_ = 0;
@@ -139,6 +149,7 @@ EContPoll ToImplControl(EPollControl control)
 {
     int implControl = CONT_POLL_ONE_SHOT;
     if (Any(control & EPollControl::EdgeTriggered)) {
+        // N.B. Edge-triggered mode disables one shot mode.
         implControl = CONT_POLL_EDGE_TRIGGERED;
     }
     if (Any(control & EPollControl::BacklogEmpty)) {
@@ -204,7 +215,7 @@ public:
         FairShareThreadPool_->Configure(threadCount);
     }
 
-    bool TryRegister(const IPollablePtr& pollable) override
+    bool TryRegister(const IPollablePtr& pollable, TString poolName) override
     {
         // FIXME(lukyan): Enqueueing in register queue may happen after stopping.
         // Create cookie when dequeueing from register queue?
@@ -214,7 +225,9 @@ public:
         }
 
         auto cookie = New<TPollableCookie>(this);
-        cookie->Invoker = FairShareThreadPool_->GetInvoker("main", Format("%v", pollable.Get()));
+        cookie->Invoker = FairShareThreadPool_->GetInvoker(
+            poolName,
+            Format("%v", pollable.Get()));
         pollable->SetCookie(std::move(cookie));
         RegisterQueue_.Enqueue(pollable);
 
@@ -224,6 +237,14 @@ public:
         return true;
     }
 
+    void SetExecutionPool(const IPollablePtr& pollable, TString poolName) override
+    {
+        auto* cookie = TPollableCookie::FromPollable(pollable.Get());
+        cookie->Invoker = FairShareThreadPool_->GetInvoker(
+            poolName,
+            Format("%v", pollable.Get()));
+    }
+
     // TODO(lukyan): Method OnShutdown in the interface and returned future are redundant.
     // Shutdown can be done by subscribing returned future or some promise can be set inside OnShutdown.
     TFuture<void> Unregister(const IPollablePtr& pollable) override
@@ -257,10 +278,7 @@ public:
 
     void Retry(const IPollablePtr& pollable) override
     {
-        if (auto guard = TryAcquireRunEventGuard(pollable.Get(), EPollControl::Retry)) {
-            auto* cookie = TPollableCookie::FromPollable(pollable.Get());
-            cookie->Invoker->Invoke(BIND(std::move(guard)));
-        }
+        ScheduleEvent(pollable, EPollControl::Retry);
     }
 
     IInvokerPtr GetInvoker() const override
@@ -274,34 +292,17 @@ public:
     }
 
 private:
-    static void DoShutdownPollable(TPollableCookie* cookie, IPollable* pollable)
-    {
-        // Poller guarantees that OnShutdown is never executed concurrently with OnEvent().
-        // Otherwise it will be removed in TRunEventGuard.
-        RunNoExcept([&] {
-            pollable->OnShutdown();
-        });
-
-        cookie->UnregisterPromise.Set();
-        cookie->Invoker.Reset();
-        auto pollerThread = std::move(cookie->PollerThread);
-        pollerThread->UnregisterQueue_.Enqueue(pollable);
-        pollerThread->WakeupHandle_.Raise();
-    }
-
     class TRunEventGuard
     {
     public:
         TRunEventGuard() = default;
 
-        TRunEventGuard(IPollable* pollable, EPollControl control)
+        explicit TRunEventGuard(IPollable* pollable)
             : Pollable_(pollable)
-            , Control_(control)
         { }
 
         explicit TRunEventGuard(TRunEventGuard&& other)
             : Pollable_(std::move(other.Pollable_))
-            , Control_(std::move(other.Control_))
         {
             other.Pollable_ = nullptr;
         }
@@ -318,34 +319,38 @@ private:
             }
 
             auto* cookie = TPollableCookie::FromPollable(Pollable_);
-            cookie->ResetControl(ToUnderlying(Control_));
+            cookie->ResetControl();
             Destroy(Pollable_);
         }
 
-        explicit operator bool() const
-        {
-            return static_cast<bool>(Pollable_);
-        }
-
         void operator()()
         {
             auto* cookie = TPollableCookie::FromPollable(Pollable_);
-            cookie->ResetControl(ToUnderlying(Control_));
-
-            Pollable_->OnEvent(Control_);
+            auto control = EPollControl(cookie->ResetControl());
+            RunNoExcept([&] {
+                Pollable_->OnEvent(control);
+            });
             Destroy(Pollable_);
             Pollable_ = nullptr;
         }
 
     private:
         IPollable* Pollable_ = nullptr;
-        EPollControl Control_ = EPollControl::None;
 
         static void Destroy(IPollable* pollable)
         {
             auto* cookie = TPollableCookie::FromPollable(pollable);
-            if (cookie->ReleaseRef()) {
-                DoShutdownPollable(cookie, pollable);
+
+            auto result = cookie->Finish();
+            switch (result) {
+                case EFinishResult::Shutdown:
+                    DoShutdownPollable(cookie, pollable);
+                    break;
+                case EFinishResult::Repeat:
+                    cookie->Invoker->Invoke(BIND(TRunEventGuard(pollable)));
+                    break;
+                case EFinishResult::None:
+                    break;
             }
         }
     };
@@ -371,16 +376,29 @@ private:
 
     std::array<TPollerImpl::TEvent, MaxEventsPerPoll> PooledImplEvents_;
 
-    static TRunEventGuard TryAcquireRunEventGuard(IPollable* pollable, EPollControl control)
+    // TODO(lukyan): Move static functions in Cookie?
+    static void ScheduleEvent(const IPollablePtr& pollable, EPollControl control)
     {
-        auto* cookie = TPollableCookie::FromPollable(pollable);
-        YT_VERIFY(cookie->GetRefCount() > 0);
-
+        // Can safely dereference pollable because even unregistered pollables are hold in Pollables_.
+        auto* cookie = TPollableCookie::FromPollable(pollable.Get());
         if (cookie->AquireControl(ToUnderlying(control))) {
-            return {pollable, control};
+            cookie->Invoker->Invoke(BIND(TRunEventGuard(pollable.Get())));
         }
+    }
 
-        return {};
+    static void DoShutdownPollable(TPollableCookie* cookie, IPollable* pollable)
+    {
+        // Poller guarantees that OnShutdown is never executed concurrently with OnEvent().
+        // Otherwise it will be removed in TRunEventGuard.
+        RunNoExcept([&] {
+            pollable->OnShutdown();
+        });
+
+        cookie->UnregisterPromise.Set();
+        cookie->Invoker.Reset();
+        auto pollerThread = std::move(cookie->PollerThread);
+        pollerThread->UnregisterQueue_.Enqueue(pollable);
+        pollerThread->WakeupHandle_.Raise();
     }
 
     void DoUnregister(const IPollablePtr& pollable)
@@ -415,11 +433,7 @@ private:
 
             YT_VERIFY(pollable->GetRefCount() > 0);
 
-            // Can safely dereference pollable because even unregistered pollables are hold in Pollables_.
-            if (auto guard = TryAcquireRunEventGuard(pollable, control)) {
-                auto* cookie = TPollableCookie::FromPollable(pollable);
-                cookie->Invoker->Invoke(BIND(std::move(guard)));
-            }
+            ScheduleEvent(pollable, control);
         }
     }