Browse Source

Introduce distributed configuration KIKIMR-19031

alexvru 1 year ago
parent
commit
8d15e67485

+ 4 - 1
library/cpp/actors/core/interconnect.h

@@ -195,6 +195,7 @@ namespace NActors {
             TString ResolveHost;
             ui16 Port;
             TNodeLocation Location;
+            bool IsStatic = true;
 
             TNodeInfo() = default;
             TNodeInfo(const TNodeInfo&) = default;
@@ -204,13 +205,15 @@ namespace NActors {
                       const TString& host,
                       const TString& resolveHost,
                       ui16 port,
-                      const TNodeLocation& location)
+                      const TNodeLocation& location,
+                      bool isStatic = true)
                 : NodeId(nodeId)
                 , Address(address)
                 , Host(host)
                 , ResolveHost(resolveHost)
                 , Port(port)
                 , Location(location)
+                , IsStatic(isStatic)
             {
             }
 

+ 3 - 0
ydb/core/base/blobstorage.h

@@ -847,6 +847,9 @@ struct TEvBlobStorage {
         EvRestartPDiskResult,
         EvNodeWardenQueryGroupInfo,
         EvNodeWardenGroupInfo,
+        EvNodeConfigPush,
+        EvNodeConfigReversePush,
+        EvNodeConfigUnbind,
 
         // Other
         EvRunActor = EvPut + 15 * 512,

+ 1 - 0
ydb/core/blobstorage/nodewarden/CMakeLists.darwin-x86_64.txt

@@ -25,6 +25,7 @@ target_link_libraries(core-blobstorage-nodewarden PUBLIC
 target_sources(core-blobstorage-nodewarden PRIVATE
   ${CMAKE_SOURCE_DIR}/ydb/core/blobstorage/nodewarden/group_stat_aggregator.cpp
   ${CMAKE_SOURCE_DIR}/ydb/core/blobstorage/nodewarden/node_warden_cache.cpp
+  ${CMAKE_SOURCE_DIR}/ydb/core/blobstorage/nodewarden/node_warden_distributed_config.cpp
   ${CMAKE_SOURCE_DIR}/ydb/core/blobstorage/nodewarden/node_warden_group.cpp
   ${CMAKE_SOURCE_DIR}/ydb/core/blobstorage/nodewarden/node_warden_group_resolver.cpp
   ${CMAKE_SOURCE_DIR}/ydb/core/blobstorage/nodewarden/node_warden_impl.cpp

+ 1 - 0
ydb/core/blobstorage/nodewarden/CMakeLists.linux-aarch64.txt

@@ -26,6 +26,7 @@ target_link_libraries(core-blobstorage-nodewarden PUBLIC
 target_sources(core-blobstorage-nodewarden PRIVATE
   ${CMAKE_SOURCE_DIR}/ydb/core/blobstorage/nodewarden/group_stat_aggregator.cpp
   ${CMAKE_SOURCE_DIR}/ydb/core/blobstorage/nodewarden/node_warden_cache.cpp
+  ${CMAKE_SOURCE_DIR}/ydb/core/blobstorage/nodewarden/node_warden_distributed_config.cpp
   ${CMAKE_SOURCE_DIR}/ydb/core/blobstorage/nodewarden/node_warden_group.cpp
   ${CMAKE_SOURCE_DIR}/ydb/core/blobstorage/nodewarden/node_warden_group_resolver.cpp
   ${CMAKE_SOURCE_DIR}/ydb/core/blobstorage/nodewarden/node_warden_impl.cpp

+ 1 - 0
ydb/core/blobstorage/nodewarden/CMakeLists.linux-x86_64.txt

@@ -26,6 +26,7 @@ target_link_libraries(core-blobstorage-nodewarden PUBLIC
 target_sources(core-blobstorage-nodewarden PRIVATE
   ${CMAKE_SOURCE_DIR}/ydb/core/blobstorage/nodewarden/group_stat_aggregator.cpp
   ${CMAKE_SOURCE_DIR}/ydb/core/blobstorage/nodewarden/node_warden_cache.cpp
+  ${CMAKE_SOURCE_DIR}/ydb/core/blobstorage/nodewarden/node_warden_distributed_config.cpp
   ${CMAKE_SOURCE_DIR}/ydb/core/blobstorage/nodewarden/node_warden_group.cpp
   ${CMAKE_SOURCE_DIR}/ydb/core/blobstorage/nodewarden/node_warden_group_resolver.cpp
   ${CMAKE_SOURCE_DIR}/ydb/core/blobstorage/nodewarden/node_warden_impl.cpp

+ 1 - 0
ydb/core/blobstorage/nodewarden/CMakeLists.windows-x86_64.txt

@@ -25,6 +25,7 @@ target_link_libraries(core-blobstorage-nodewarden PUBLIC
 target_sources(core-blobstorage-nodewarden PRIVATE
   ${CMAKE_SOURCE_DIR}/ydb/core/blobstorage/nodewarden/group_stat_aggregator.cpp
   ${CMAKE_SOURCE_DIR}/ydb/core/blobstorage/nodewarden/node_warden_cache.cpp
+  ${CMAKE_SOURCE_DIR}/ydb/core/blobstorage/nodewarden/node_warden_distributed_config.cpp
   ${CMAKE_SOURCE_DIR}/ydb/core/blobstorage/nodewarden/node_warden_group.cpp
   ${CMAKE_SOURCE_DIR}/ydb/core/blobstorage/nodewarden/node_warden_group_resolver.cpp
   ${CMAKE_SOURCE_DIR}/ydb/core/blobstorage/nodewarden/node_warden_impl.cpp

+ 161 - 0
ydb/core/blobstorage/nodewarden/bind_queue.h

@@ -0,0 +1,161 @@
+#pragma once
+
+#include "defs.h"
+
+namespace NKikimr::NStorage {
+
+    class TBindQueue {
+        struct TItem {
+            ui32 NodeId;
+            TMonotonic NextTryTimestamp = TMonotonic::Zero();
+
+            TItem(ui32 nodeId)
+                : NodeId(nodeId)
+            {}
+        };
+
+        // BindQueue is arranged in the following way:
+        // <active items> ActiveEnd <processed items> ProcessedEnd <disabled items> end()
+        std::vector<TItem> BindQueue;
+        size_t ActiveEnd = 0;
+        size_t ProcessedEnd = 0;
+        THashMap<ui32, size_t> NodeIdToBindQueue;
+
+#ifndef NDEBUG
+        THashSet<ui32> Enabled, Disabled;
+#endif
+
+    public:
+        void Disable(ui32 nodeId) {
+#ifndef NDEBUG
+            Y_VERIFY(Enabled.contains(nodeId));
+            Enabled.erase(nodeId);
+            Disabled.insert(nodeId);
+#endif
+
+            const auto it = NodeIdToBindQueue.find(nodeId);
+            Y_VERIFY_S(it != NodeIdToBindQueue.end(), "NodeId# " << nodeId);
+            size_t index = it->second;
+
+            // ensure item is not yet disabled
+            Y_VERIFY(index < ProcessedEnd);
+
+            // if item is active, move it to processed as a transit stage
+            if (index < ActiveEnd) {
+                Swap(index, --ActiveEnd);
+                index = ActiveEnd;
+            }
+
+            // move it to disabled
+            Swap(index, --ProcessedEnd);
+        }
+
+        void Enable(ui32 nodeId) {
+#ifndef NDEBUG
+            Y_VERIFY(Disabled.contains(nodeId));
+            Disabled.erase(nodeId);
+            Enabled.insert(nodeId);
+#endif
+
+            const auto it = NodeIdToBindQueue.find(nodeId);
+            Y_VERIFY(it != NodeIdToBindQueue.end());
+            size_t index = it->second;
+
+            // ensure item is disabled
+            Y_VERIFY(ProcessedEnd <= index);
+
+            // move it back to processed and then to active
+            Swap(index, ProcessedEnd++);
+            Swap(ProcessedEnd - 1, ActiveEnd++);
+        }
+
+        std::optional<ui32> Pick(TMonotonic now, TMonotonic *closest) {
+            // scan through processed items and find matching if there are no active items
+            if (!ActiveEnd) {
+                for (size_t k = 0; k < ProcessedEnd; ++k) {
+                    if (BindQueue[k].NextTryTimestamp <= now) {
+                        // make it active
+                        Swap(k, ActiveEnd++);
+                    }
+                }
+            }
+
+            // pick a random item from Active set, if there is any
+            if (ActiveEnd) {
+                const size_t index = RandomNumber(ActiveEnd);
+                const ui32 nodeId = BindQueue[index].NodeId;
+                BindQueue[index].NextTryTimestamp = now + TDuration::Seconds(1);
+                Swap(index, --ActiveEnd); // move item to processed
+                return nodeId;
+            } else {
+                *closest = TMonotonic::Max();
+                for (size_t k = ActiveEnd; k < ProcessedEnd; ++k) {
+                    *closest = Min(*closest, BindQueue[k].NextTryTimestamp);
+                }
+                return std::nullopt;
+            }
+        }
+
+        void Update(const std::vector<ui32>& nodeIds) {
+            // remember node ids that are still pending
+            THashSet<ui32> processedNodeIds;
+            THashSet<ui32> disabledNodeIds;
+            for (size_t k = 0; k < BindQueue.size(); ++k) {
+                if (ActiveEnd <= k && k < ProcessedEnd) {
+                    processedNodeIds.insert(BindQueue[k].NodeId);
+                } else if (ProcessedEnd <= k) {
+                    disabledNodeIds.insert(BindQueue[k].NodeId);
+                }
+            }
+
+            // create a set of available node ids (excluding this one)
+            THashSet<ui32> nodeIdsSet(nodeIds.begin(), nodeIds.end());
+
+            // remove deleted nodes from the BindQueue and filter our existing ones in nodeIdsSet
+            auto removePred = [&](const TItem& item) {
+                if (nodeIdsSet.erase(item.NodeId)) {
+                    // there is such item in nodeIdsSet, so keep it in BindQueue
+                    return false;
+                } else {
+                    // item has vanished, remove it from BindQueue and NodeIdToBindQueue map
+                    NodeIdToBindQueue.erase(item.NodeId);
+#ifndef NDEBUG
+                    Enabled.erase(item.NodeId);
+                    Disabled.erase(item.NodeId);
+#endif
+                    return true;
+                }
+            };
+            BindQueue.erase(std::remove_if(BindQueue.begin(), BindQueue.end(), removePred), BindQueue.end());
+            for (const ui32 nodeId : nodeIdsSet) {
+                BindQueue.emplace_back(nodeId);
+#ifndef NDEBUG
+                Enabled.insert(nodeId);
+#endif
+            }
+
+            // move known disabled nodes to the end
+            auto disabledPred = [&](const TItem& item) { return !disabledNodeIds.contains(item.NodeId); };
+            ProcessedEnd = std::partition(BindQueue.begin(), BindQueue.end(), disabledPred) - BindQueue.begin();
+
+            // rearrange active and processed nodes -- keep processed ones in place, new nodes are added as active
+            auto processedPred = [&](const TItem& item) { return !processedNodeIds.contains(item.NodeId); };
+            ActiveEnd = std::partition(BindQueue.begin(), BindQueue.begin() + ProcessedEnd, processedPred) - BindQueue.begin();
+
+            // update revmap
+            NodeIdToBindQueue.clear();
+            for (size_t k = 0; k < BindQueue.size(); ++k) {
+                NodeIdToBindQueue[BindQueue[k].NodeId] = k;
+            }
+        }
+
+    private:
+        void Swap(size_t x, size_t y) {
+            if (x != y) {
+                std::swap(BindQueue[x], BindQueue[y]);
+                std::swap(NodeIdToBindQueue[BindQueue[x].NodeId], NodeIdToBindQueue[BindQueue[y].NodeId]);
+            }
+        }
+    };
+
+} // NKikimr::NStorage

+ 93 - 0
ydb/core/blobstorage/nodewarden/bind_queue_ut.cpp

@@ -0,0 +1,93 @@
+#include "bind_queue.h"
+#include <library/cpp/testing/unittest/registar.h>
+
+using namespace NActors;
+using namespace NKikimr;
+using namespace NKikimr::NStorage;
+
+Y_UNIT_TEST_SUITE(BindQueue) {
+    Y_UNIT_TEST(Basic) {
+        TBindQueue bindQueue;
+
+        std::vector<ui32> nodes;
+        for (ui32 i = 1; i <= 100; ++i) {
+            nodes.push_back(i);
+        }
+
+        ui32 nextNodeId = nodes.size() + 1;
+
+        std::vector<ui32> enabled(nodes.begin(), nodes.end());
+        std::vector<ui32> disabled;
+
+        THashSet<ui32> enabledSet(enabled.begin(), enabled.end());
+
+        bindQueue.Update(nodes);
+
+        TMonotonic now;
+
+        for (ui32 iter = 0; iter < 100000; ++iter) {
+            const bool canEnable = !disabled.empty();
+            const bool canDisable = !enabled.empty();
+            const bool canAddNode = nodes.size() < 100;
+            const bool canDeleteNode = nodes.size() >= 10;
+            const bool canPick = true;
+
+            const ui32 w = canEnable + canDisable + canAddNode + canDeleteNode + canPick;
+
+            ui32 i = RandomNumber(w);
+
+            if (canEnable && !i--) {
+                const size_t index = RandomNumber(disabled.size());
+                const ui32 nodeId = disabled[index];
+                Cerr << "Enable nodeId# " << nodeId << Endl;
+                std::swap(disabled[index], disabled.back());
+                disabled.pop_back();
+                enabled.push_back(nodeId);
+                enabledSet.insert(nodeId);
+                bindQueue.Enable(nodeId);
+            } else if (canDisable && !i--) {
+                const size_t index = RandomNumber(enabled.size());
+                const ui32 nodeId = enabled[index];
+                Cerr << "Disable nodeId# " << nodeId << Endl;
+                std::swap(enabled[index], enabled.back());
+                enabled.pop_back();
+                enabledSet.erase(nodeId);
+                disabled.push_back(nodeId);
+                bindQueue.Disable(nodeId);
+            } else if (canAddNode && !i--) {
+                Cerr << "Add nodeId# " << nextNodeId << Endl;
+                nodes.push_back(nextNodeId);
+                enabled.push_back(nextNodeId);
+                enabledSet.insert(nextNodeId);
+                bindQueue.Update(nodes);
+                ++nextNodeId;
+            } else if (canDeleteNode && !i--) {
+                const size_t index = RandomNumber(nodes.size());
+                const ui32 nodeId = nodes[index];
+                Cerr << "Delete nodeId# " << nodeId << Endl;
+                std::swap(nodes[index], nodes.back());
+                nodes.pop_back();
+                if (const auto it = std::find(enabled.begin(), enabled.end(), nodeId); it != enabled.end()) {
+                    std::swap(*it, enabled.back());
+                    enabled.pop_back();
+                    enabledSet.erase(nodeId);
+                } else if (const auto it = std::find(disabled.begin(), disabled.end(), nodeId); it != disabled.end()) {
+                    std::swap(*it, disabled.back());
+                    disabled.pop_back();
+                } else {
+                    UNIT_FAIL("unexpected case");
+                }
+                bindQueue.Update(nodes);
+            } else if (canPick && !i--) {
+                Cerr << "Pick" << Endl;
+                THashSet<ui32> picked;
+                TMonotonic closest;
+                while (const auto res = bindQueue.Pick(now, &closest)) {
+                    UNIT_ASSERT(picked.insert(*res).second);
+                }
+                UNIT_ASSERT_VALUES_EQUAL(picked, enabledSet);
+                now += TDuration::Seconds(1);
+            }
+        }
+    }
+}

+ 587 - 0
ydb/core/blobstorage/nodewarden/node_warden_distributed_config.cpp

@@ -0,0 +1,587 @@
+#include "node_warden_impl.h"
+#include "node_warden_events.h"
+#include "bind_queue.h"
+
+namespace NKikimr::NStorage {
+
+    class TDistributedConfigKeeper : public TActorBootstrapped<TDistributedConfigKeeper> {
+        struct TEvPrivate {
+            enum {
+               EvProcessPendingEvent = EventSpaceBegin(TEvents::ES_PRIVATE),
+            };
+        };
+
+        static constexpr ui64 OutgoingBindingCookie = 1;
+        static constexpr ui64 IncomingBindingCookie = 2;
+
+        struct TBinding {
+            ui32 NodeId; // we have direct binding to this node
+            ui32 RootNodeId; // this is the terminal node id for the whole binding chain
+            ui64 Cookie; // binding cookie within the session
+            TActorId SessionId; // session that connects to the node
+            std::vector<std::unique_ptr<IEventBase>> PendingEvents;
+
+            TBinding(ui32 nodeId, ui32 rootNodeId, ui64 cookie, TActorId sessionId)
+                : NodeId(nodeId)
+                , RootNodeId(rootNodeId)
+                , Cookie(cookie)
+                , SessionId(sessionId)
+            {}
+
+            bool Expected(IEventHandle& ev) const {
+                return NodeId == ev.Sender.NodeId()
+                    && Cookie == ev.Cookie
+                    && SessionId == ev.InterconnectSession;
+            }
+
+            TString ToString() const {
+                return TStringBuilder() << '{' << NodeId << '.' << RootNodeId << '/' << Cookie
+                    << '@' << SessionId << '}';
+            }
+        };
+
+        struct TBoundNode {
+            ui64 Cookie;
+            TActorId SessionId;
+            THashSet<ui32> BoundNodeIds;
+
+            TBoundNode(ui64 cookie, TActorId sessionId)
+                : Cookie(cookie)
+                , SessionId(sessionId)
+            {}
+        };
+
+        // current most relevant storage config
+        NKikimrBlobStorage::TStorageConfig StorageConfig;
+
+        // outgoing binding
+        std::optional<TBinding> Binding;
+        ui64 BindingCookie = RandomNumber<ui64>();
+        TBindQueue BindQueue;
+        ui32 NumPeerNodes = 0;
+        bool Scheduled = false;
+
+        // incoming bindings
+        THashMap<ui32, TBoundNode> DirectBoundNodes; // a set of nodes directly bound to this one
+        THashMap<ui32, ui32> AllBoundNodes; // counter may be more than 2 in case of races, but not for long
+
+        std::deque<TAutoPtr<IEventHandle>> PendingEvents;
+        std::vector<ui32> NodeIds;
+
+    public:
+        void Bootstrap() {
+            STLOG(PRI_DEBUG, BS_NODE, NWDC00, "Bootstrap");
+            StorageConfig.SetGeneration(1);
+            Send(GetNameserviceActorId(), new TEvInterconnect::TEvListNodes(true));
+            Become(&TThis::StateWaitForList);
+        }
+
+        void Handle(TEvInterconnect::TEvNodesInfo::TPtr ev) {
+            STLOG(PRI_DEBUG, BS_NODE, NWDC11, "TEvNodesInfo");
+
+            // create a vector of peer static nodes
+            bool iAmStatic = false;
+            std::vector<ui32> nodeIds;
+            const ui32 selfNodeId = SelfId().NodeId();
+            for (const auto& item : ev->Get()->Nodes) {
+                if (item.NodeId == selfNodeId) {
+                    iAmStatic = item.IsStatic;
+                } else if (item.IsStatic) {
+                    nodeIds.push_back(item.NodeId);
+                }
+            }
+            std::sort(nodeIds.begin(), nodeIds.end());
+
+            // do not start configuration negotiation for dynamic nodes
+            if (!iAmStatic) {
+                Y_VERIFY(NodeIds.empty());
+                return;
+            }
+
+            // check if some nodes were deleted -- we have to unbind them
+            bool bindingReset = false;
+            bool changes = false;
+            for (auto prevIt = NodeIds.begin(), curIt = nodeIds.begin(); prevIt != NodeIds.end() || curIt != nodeIds.end(); ) {
+                if (prevIt == NodeIds.end() || *curIt < *prevIt) { // node added
+                    ++curIt;
+                    changes = true;
+                } else if (curIt == NodeIds.end() || *prevIt < *curIt) { // node deleted
+                    const ui32 nodeId = *prevIt++;
+                    UnbindNode(nodeId, true);
+                    if (Binding && Binding->NodeId == nodeId) {
+                        Binding.reset();
+                        bindingReset = true;
+                    }
+                    changes = true;
+                } else {
+                    Y_VERIFY(*prevIt == *curIt);
+                    ++prevIt;
+                    ++curIt;
+                }
+            }
+
+            if (!changes) {
+                return;
+            }
+
+            // issue updates
+            NodeIds = std::move(nodeIds);
+            BindQueue.Update(NodeIds);
+            NumPeerNodes = NodeIds.size() - 1;
+            IssueNextBindRequest();
+
+            if (bindingReset) {
+                for (const auto& [nodeId, info] : DirectBoundNodes) {
+                    SendEvent(nodeId, info.Cookie, info.SessionId, std::make_unique<TEvNodeConfigReversePush>(nullptr,
+                        GetRootNodeId()));
+                }
+            }
+        }
+
+        void UnsubscribeInterconnect(ui32 nodeId, TActorId sessionId) {
+            if (Binding && Binding->NodeId == nodeId) {
+                return;
+            }
+            if (DirectBoundNodes.contains(nodeId)) {
+                return;
+            }
+            TActivationContext::Send(new IEventHandle(TEvents::TSystem::Unsubscribe, 0, sessionId, SelfId(), nullptr, 0));
+        }
+
+        ////////////////////////////////////////////////////////////////////////////////////////////////////////////////
+        // Binding to peer nodes
+
+        void IssueNextBindRequest() {
+            if (!Binding && AllBoundNodes.size() + 1 /* including this one */ < NumPeerNodes) {
+                const TMonotonic now = TActivationContext::Monotonic();
+                TMonotonic closest;
+                if (std::optional<ui32> nodeId = BindQueue.Pick(now, &closest)) {
+                    Binding.emplace(*nodeId, 0, ++BindingCookie, TActorId());
+                    STLOG(PRI_DEBUG, BS_NODE, NWDC01, "Initiating bind", (Binding, Binding));
+                    TActivationContext::Send(new IEventHandle(TEvInterconnect::EvConnectNode, 0,
+                        TActivationContext::InterconnectProxy(Binding->NodeId), SelfId(), nullptr, OutgoingBindingCookie));
+                } else if (closest != TMonotonic::Max() && !Scheduled) {
+                    TActivationContext::Schedule(closest, new IEventHandle(TEvents::TSystem::Wakeup, 0, SelfId(), {}, nullptr, 0));
+                    Scheduled = true;
+                }
+            }
+        }
+
+        void Handle(TEvInterconnect::TEvNodeConnected::TPtr ev) {
+            const ui32 nodeId = ev->Get()->NodeId;
+            STLOG(PRI_DEBUG, BS_NODE, NWDC14, "TEvNodeConnected", (NodeId, nodeId));
+
+            if (ev->Cookie == OutgoingBindingCookie) {
+                Y_VERIFY(Binding);
+                Y_VERIFY(Binding->NodeId == nodeId);
+                Binding->SessionId = ev->Sender;
+
+                // send any accumulated events generated while we were waiting for the connection
+                for (auto& ev : std::exchange(Binding->PendingEvents, {})) {
+                    SendEvent(*Binding, std::move(ev));
+                }
+
+                STLOG(PRI_DEBUG, BS_NODE, NWDC09, "Continuing bind", (Binding, Binding));
+                SendEvent(nodeId, Binding->Cookie, Binding->SessionId, std::make_unique<TEvNodeConfigPush>(&StorageConfig,
+                    AllBoundNodes));
+            }
+        }
+
+        void HandleWakeup() {
+            Y_VERIFY(Scheduled);
+            Scheduled = false;
+            IssueNextBindRequest();
+        }
+
+        void Handle(TEvNodeConfigReversePush::TPtr ev) {
+            const ui32 senderNodeId = ev->Sender.NodeId();
+            Y_VERIFY(senderNodeId != SelfId().NodeId());
+            auto& record = ev->Get()->Record;
+
+            STLOG(PRI_DEBUG, BS_NODE, NWDC17, "TEvNodeConfigReversePush", (NodeId, senderNodeId), (Cookie, ev->Cookie),
+                (SessionId, ev->InterconnectSession), (Binding, Binding), (Record, record));
+
+            if (Binding && Binding->Expected(*ev)) {
+                // check if this binding was accepted and if it is acceptable from our point of view
+                bool rejected = record.GetRejected();
+                const char *rejectReason = nullptr;
+                bool rootUpdated = false;
+                if (rejected) {
+                    // applicable only for initial binding
+                    Y_VERIFY_DEBUG(!Binding->RootNodeId);
+                    rejectReason = "peer";
+                } else {
+                    const ui32 prevRootNodeId = std::exchange(Binding->RootNodeId, record.GetRootNodeId());
+                    if (Binding->RootNodeId == SelfId().NodeId()) {
+                        // root node changes and here we are in cycle -- break it
+                        SendEvent(*Binding, std::make_unique<TEvNodeConfigUnbind>());
+                        rejected = true;
+                        rejectReason = "self";
+                    } else if (prevRootNodeId != Binding->RootNodeId) {
+                        if (prevRootNodeId) {
+                            STLOG(PRI_DEBUG, BS_NODE, NWDC13, "Binding updated", (Binding, Binding));
+                        } else {
+                            STLOG(PRI_DEBUG, BS_NODE, NWDC07, "Binding established", (Binding, Binding));
+                        }
+                        rootUpdated = true;
+                    }
+                }
+
+                if (rejected) {
+                    STLOG(PRI_DEBUG, BS_NODE, NWDC06, "Binding rejected", (Binding, Binding), (Reason, rejectReason));
+
+                    // binding needs to be reestablished
+                    const TActorId sessionId = Binding->SessionId;
+                    Binding.reset();
+                    IssueNextBindRequest();
+
+                    // unsubscribe from peer node unless there are incoming bindings active
+                    UnsubscribeInterconnect(senderNodeId, sessionId);
+                }
+
+                // check if we have newer configuration from the peer
+                bool configUpdated = false;
+                if (record.HasStorageConfig()) {
+                    const auto& config = record.GetStorageConfig();
+                    if (StorageConfig.GetGeneration() < config.GetGeneration()) {
+                        StorageConfig.Swap(record.MutableStorageConfig());
+                        configUpdated = true;
+                    }
+                }
+
+                // fan-out updates to the following peers
+                if (configUpdated || rootUpdated) {
+                    for (const auto& [nodeId, info] : DirectBoundNodes) {
+                        SendEvent(nodeId, info, std::make_unique<TEvNodeConfigReversePush>(
+                            configUpdated ? &StorageConfig : nullptr, GetRootNodeId()));
+                    }
+                }
+            } else {
+                // a race is possible when we have cancelled the binding, but there were updates in flight
+            }
+        }
+
+        ////////////////////////////////////////////////////////////////////////////////////////////////////////////////
+        // Binding requests from peer nodes
+
+        void AddBound(ui32 nodeId, TEvNodeConfigPush *msg) {
+            if (const auto [it, inserted] = AllBoundNodes.try_emplace(nodeId, 1); inserted) {
+                if (msg) {
+                    msg->Record.AddNewBoundNodeIds(nodeId);
+                }
+                if (nodeId != SelfId().NodeId()) {
+                    BindQueue.Disable(nodeId);
+                }
+            } else {
+                ++it->second;
+            }
+        }
+
+        void DeleteBound(ui32 nodeId, TEvNodeConfigPush *msg) {
+            const auto it = AllBoundNodes.find(nodeId);
+            Y_VERIFY(it != AllBoundNodes.end());
+            if (!--it->second) {
+                AllBoundNodes.erase(it);
+                if (msg) {
+                    msg->Record.AddDeletedBoundNodeIds(nodeId);
+                }
+                if (nodeId != SelfId().NodeId()) {
+                    BindQueue.Enable(nodeId);
+                }
+            }
+        }
+
+        void Handle(TEvNodeConfigPush::TPtr ev) {
+            const ui32 senderNodeId = ev->Sender.NodeId();
+            Y_VERIFY(senderNodeId != SelfId().NodeId());
+            auto& record = ev->Get()->Record;
+
+            STLOG(PRI_DEBUG, BS_NODE, NWDC02, "TEvNodeConfigPush", (NodeId, senderNodeId), (Cookie, ev->Cookie),
+                (SessionId, ev->InterconnectSession), (Binding, Binding), (Record, record));
+
+            // check if we can't accept this message (or else it would make a cycle)
+            if (record.GetInitial()) {
+                bool reject = Binding && senderNodeId == Binding->RootNodeId;
+                if (!reject) {
+                    for (const ui32 nodeId : record.GetNewBoundNodeIds()) {
+                        if (nodeId == SelfId().NodeId()) {
+                            reject = true;
+                            break;
+                        }
+                    }
+                }
+                if (reject) {
+                    STLOG(PRI_DEBUG, BS_NODE, NWDC03, "TEvNodeConfigPush rejected", (NodeId, senderNodeId),
+                        (Cookie, ev->Cookie), (SessionId, ev->InterconnectSession), (Binding, Binding),
+                        (Record, record));
+                    SendEvent(*ev, TEvNodeConfigReversePush::MakeRejected());
+                    return;
+                }
+            }
+
+            // prepare configuration push down
+            auto downEv = Binding
+                ? std::make_unique<TEvNodeConfigPush>()
+                : nullptr;
+
+            // and configuration push up
+            auto upEv = record.GetInitial()
+                ? std::make_unique<TEvNodeConfigReversePush>(nullptr, GetRootNodeId())
+                : nullptr;
+
+            // insert new connection into map (if there is none)
+            const auto [it, inserted] = DirectBoundNodes.try_emplace(senderNodeId, ev->Cookie, ev->InterconnectSession);
+            TBoundNode& info = it->second;
+
+            if (inserted) {
+                if (!record.GetInitial()) {
+                    // may be a race with rejected queries
+                    DirectBoundNodes.erase(it);
+                    return;
+                } else {
+                    // subscribe to the session -- we need to know when the channel breaks
+                    TActivationContext::Send(new IEventHandle(TEvents::TSystem::Subscribe, 0, ev->InterconnectSession,
+                        SelfId(), nullptr, IncomingBindingCookie));
+                }
+
+                // account newly bound node itself and add it to the record
+                AddBound(senderNodeId, downEv.get());
+            } else if (ev->Cookie != info.Cookie || ev->InterconnectSession != info.SessionId) {
+                STLOG(PRI_CRIT, BS_NODE, NWDC12, "distributed configuration protocol violation: cookie/session mismatch",
+                    (Sender, ev->Sender),
+                    (Cookie, ev->Cookie),
+                    (SessionId, ev->InterconnectSession),
+                    (ExpectedCookie, info.Cookie),
+                    (ExpectedSessionId, info.SessionId));
+
+                Y_VERIFY_DEBUG(false);
+                return;
+            }
+
+            // process added items
+            for (const ui32 nodeId : record.GetNewBoundNodeIds()) {
+                if (info.BoundNodeIds.insert(nodeId).second) {
+                    AddBound(nodeId, downEv.get());
+                } else {
+                    STLOG(PRI_CRIT, BS_NODE, NWDC04, "distributed configuration protocol violation: adding duplicate item",
+                        (Sender, ev->Sender),
+                        (Cookie, ev->Cookie),
+                        (SessionId, ev->InterconnectSession),
+                        (Record, record),
+                        (NodeId, nodeId));
+
+                    Y_VERIFY_DEBUG(false);
+                }
+            }
+
+            // process deleted items
+            for (const ui32 nodeId : record.GetDeletedBoundNodeIds()) {
+                if (info.BoundNodeIds.erase(nodeId)) {
+                    DeleteBound(nodeId, downEv.get());
+                } else {
+                    STLOG(PRI_CRIT, BS_NODE, NWDC05, "distributed configuration protocol violation: deleting nonexisting item",
+                        (Sender, ev->Sender),
+                        (Cookie, ev->Cookie),
+                        (SessionId, ev->InterconnectSession),
+                        (Record, record),
+                        (NodeId, nodeId));
+
+                    Y_VERIFY_DEBUG(false);
+                }
+            }
+
+            // process configuration update
+            if (record.HasStorageConfig()) {
+                const auto& config = record.GetStorageConfig();
+                if (StorageConfig.GetGeneration() < config.GetGeneration()) {
+                    StorageConfig.Swap(record.MutableStorageConfig());
+                    if (downEv) {
+                        downEv->Record.MutableStorageConfig()->CopyFrom(StorageConfig);
+                    }
+
+                    for (const auto& [nodeId, info] : DirectBoundNodes) {
+                        if (nodeId != senderNodeId) {
+                            SendEvent(nodeId, info, std::make_unique<TEvNodeConfigReversePush>(&StorageConfig, GetRootNodeId()));
+                        }
+                    }
+                } else if (config.GetGeneration() < StorageConfig.GetGeneration() && upEv) {
+                    upEv->Record.MutableStorageConfig()->CopyFrom(StorageConfig);
+                }
+            }
+
+            if (downEv && (downEv->Record.HasStorageConfig() || downEv->Record.NewBoundNodeIdsSize() ||
+                    downEv->Record.DeletedBoundNodeIdsSize())) {
+                SendEvent(*Binding, std::move(downEv));
+            }
+            if (upEv) {
+                SendEvent(senderNodeId, info, std::move(upEv));
+            }
+        }
+
+        void Handle(TEvNodeConfigUnbind::TPtr ev) {
+            const ui32 senderNodeId = ev->Sender.NodeId();
+            Y_VERIFY(senderNodeId != SelfId().NodeId());
+
+            STLOG(PRI_DEBUG, BS_NODE, NWDC16, "TEvNodeConfigUnbind", (NodeId, senderNodeId), (Cookie, ev->Cookie),
+                (SessionId, ev->InterconnectSession), (Binding, Binding));
+
+            if (const auto it = DirectBoundNodes.find(senderNodeId); it != DirectBoundNodes.end() &&
+                    ev->Cookie == it->second.Cookie && ev->InterconnectSession == it->second.SessionId) {
+                UnbindNode(it->first, false);
+            } else {
+                STLOG(PRI_CRIT, BS_NODE, NWDC08, "distributed configuration protocol violation: unexpected unbind event",
+                    (Sender, ev->Sender),
+                    (Cookie, ev->Cookie),
+                    (SessionId, ev->InterconnectSession));
+
+                Y_VERIFY_DEBUG(false);
+            }
+        }
+
+        void UnbindNode(ui32 nodeId, bool byDisconnect) {
+            if (const auto it = DirectBoundNodes.find(nodeId); it != DirectBoundNodes.end()) {
+                TBoundNode& info = it->second;
+
+                auto ev = Binding ? std::make_unique<TEvNodeConfigPush>() : nullptr;
+
+                DeleteBound(nodeId, ev.get());
+                for (const ui32 boundNodeId : info.BoundNodeIds) {
+                    DeleteBound(boundNodeId, ev.get());
+                }
+
+                if (ev && ev->Record.DeletedBoundNodeIdsSize()) {
+                    SendEvent(*Binding, std::move(ev));
+                }
+
+                const TActorId sessionId = info.SessionId;
+                DirectBoundNodes.erase(it);
+
+                if (!byDisconnect) {
+                    UnsubscribeInterconnect(nodeId, sessionId);
+                }
+
+                IssueNextBindRequest();
+            }
+        }
+
+        ui32 GetRootNodeId() const {
+            return Binding && Binding->RootNodeId ? Binding->RootNodeId : SelfId().NodeId();
+        }
+
+        ////////////////////////////////////////////////////////////////////////////////////////////////////////////////
+        // Event delivery
+
+        void SendEvent(ui32 nodeId, ui64 cookie, TActorId sessionId, std::unique_ptr<IEventBase> ev) {
+            Y_VERIFY(nodeId != SelfId().NodeId());
+            auto handle = std::make_unique<IEventHandle>(MakeBlobStorageNodeWardenID(nodeId), SelfId(), ev.release(), 0, cookie);
+            Y_VERIFY(sessionId);
+            handle->Rewrite(TEvInterconnect::EvForward, sessionId);
+            TActivationContext::Send(handle.release());
+        }
+
+        void SendEvent(TBinding& binding, std::unique_ptr<IEventBase> ev) {
+            if (binding.SessionId) {
+                SendEvent(binding.NodeId, binding.Cookie, binding.SessionId, std::move(ev));
+            } else {
+                binding.PendingEvents.push_back(std::move(ev));
+            }
+        }
+
+        void SendEvent(IEventHandle& handle, std::unique_ptr<IEventBase> ev) {
+            SendEvent(handle.Sender.NodeId(), handle.Cookie, handle.InterconnectSession, std::move(ev));
+        }
+
+        void SendEvent(ui32 nodeId, const TBoundNode& info, std::unique_ptr<IEventBase> ev) {
+            SendEvent(nodeId, info.Cookie, info.SessionId, std::move(ev));
+        }
+
+        ////////////////////////////////////////////////////////////////////////////////////////////////////////////////
+        // Connectivity handling
+
+        void Handle(TEvInterconnect::TEvNodeDisconnected::TPtr ev) {
+            const ui32 nodeId = ev->Get()->NodeId;
+            STLOG(PRI_DEBUG, BS_NODE, NWDC15, "TEvNodeDisconnected", (NodeId, nodeId));
+            UnbindNode(nodeId, true);
+            if (Binding && Binding->NodeId == nodeId) {
+                STLOG(PRI_DEBUG, BS_NODE, NWDC10, "Binding aborted by disconnection", (Binding, Binding));
+
+                Binding.reset();
+                IssueNextBindRequest();
+
+                for (const auto& [nodeId, info] : DirectBoundNodes) {
+                    SendEvent(nodeId, info.Cookie, info.SessionId, std::make_unique<TEvNodeConfigReversePush>(nullptr,
+                        GetRootNodeId()));
+                }
+            }
+        }
+
+        ////////////////////////////////////////////////////////////////////////////////////////////////////////////////
+        // Consistency checking
+
+        void ConsistencyCheck() {
+#ifndef NDEBUG
+            THashMap<ui32, ui32> refAllBoundNodeIds;
+            for (const auto& [nodeId, info] : DirectBoundNodes) {
+                ++refAllBoundNodeIds[nodeId];
+                for (const ui32 boundNodeId : info.BoundNodeIds) {
+                    ++refAllBoundNodeIds[boundNodeId];
+                }
+            }
+            Y_VERIFY(AllBoundNodes == refAllBoundNodeIds);
+
+            for (const auto& [nodeId, info] : DirectBoundNodes) {
+                Y_VERIFY(std::binary_search(NodeIds.begin(), NodeIds.end(), nodeId));
+            }
+            if (Binding) {
+                Y_VERIFY(std::binary_search(NodeIds.begin(), NodeIds.end(), Binding->NodeId));
+            }
+#endif
+        }
+
+        STFUNC(StateWaitForList) {
+            switch (ev->GetTypeRewrite()) {
+                case TEvInterconnect::TEvNodesInfo::EventType:
+                    PendingEvents.push_front(std::move(ev));
+                    [[fallthrough]];
+                case TEvPrivate::EvProcessPendingEvent:
+                    Y_VERIFY(!PendingEvents.empty());
+                    StateFunc(PendingEvents.front());
+                    PendingEvents.pop_front();
+                    if (PendingEvents.empty()){
+                        Become(&TThis::StateFunc);
+                    } else {
+                        TActivationContext::Send(new IEventHandle(TEvPrivate::EvProcessPendingEvent, 0, SelfId(), {}, nullptr, 0));
+                    }
+                    break;
+
+                default:
+                    PendingEvents.push_back(std::move(ev));
+                    break;
+            }
+        }
+
+        STFUNC(StateFunc) {
+            STRICT_STFUNC_BODY(
+                hFunc(TEvNodeConfigPush, Handle);
+                hFunc(TEvNodeConfigReversePush, Handle);
+                hFunc(TEvNodeConfigUnbind, Handle);
+                hFunc(TEvInterconnect::TEvNodesInfo, Handle);
+                hFunc(TEvInterconnect::TEvNodeConnected, Handle);
+                hFunc(TEvInterconnect::TEvNodeDisconnected, Handle);
+                cFunc(TEvents::TSystem::Wakeup, HandleWakeup);
+                cFunc(TEvents::TSystem::Poison, PassAway);
+            )
+            ConsistencyCheck();
+        }
+    };
+
+    void TNodeWarden::StartDistributedConfigKeeper() {
+        DistributedConfigKeeperId = Register(new TDistributedConfigKeeper);
+    }
+
+    void TNodeWarden::ForwardToDistributedConfigKeeper(STATEFN_SIG) {
+        ev->Rewrite(ev->GetTypeRewrite(), DistributedConfigKeeperId);
+        TActivationContext::Send(ev.Release());
+    }
+
+} // NKikimr::NStorage

+ 50 - 0
ydb/core/blobstorage/nodewarden/node_warden_events.h

@@ -0,0 +1,50 @@
+#pragma once
+
+#include "defs.h"
+
+#include <ydb/core/protos/blobstorage_distributed_config.pb.h>
+
+namespace NKikimr::NStorage {
+
+    struct TEvNodeConfigPush
+        : TEventPB<TEvNodeConfigPush, NKikimrBlobStorage::TEvNodeConfigPush, TEvBlobStorage::EvNodeConfigPush>
+    {
+        TEvNodeConfigPush() = default;
+
+        // ctor for initial push request
+        TEvNodeConfigPush(const NKikimrBlobStorage::TStorageConfig *config, const THashMap<ui32, ui32>& boundNodeIds) {
+            if (config) {
+                Record.MutableStorageConfig()->CopyFrom(*config);
+            }
+            for (const auto [nodeId, counter] : boundNodeIds) {
+                Record.AddNewBoundNodeIds(nodeId);
+            }
+            Record.SetInitial(true);
+        }
+    };
+
+    struct TEvNodeConfigReversePush
+        : TEventPB<TEvNodeConfigReversePush, NKikimrBlobStorage::TEvNodeConfigReversePush, TEvBlobStorage::EvNodeConfigReversePush>
+    {
+        TEvNodeConfigReversePush() = default;
+
+        TEvNodeConfigReversePush(const NKikimrBlobStorage::TStorageConfig *config, ui32 rootNodeId) {
+            if (config) {
+                Record.MutableStorageConfig()->CopyFrom(*config);
+            }
+            Record.SetRootNodeId(rootNodeId);
+        }
+
+        static std::unique_ptr<TEvNodeConfigReversePush> MakeRejected() {
+            auto res = std::make_unique<TEvNodeConfigReversePush>();
+            res->Record.SetRejected(true);
+            return res;
+        }
+    };
+
+    struct TEvNodeConfigUnbind
+        : TEventPB<TEvNodeConfigUnbind, NKikimrBlobStorage::TEvNodeConfigUnbind, TEvBlobStorage::EvNodeConfigUnbind>
+    {
+    };
+
+} // NKikimr::NStorage

Some files were not shown because too many files changed in this diff