Browse Source

Intermediate changes

robot-piglet 10 months ago
parent
commit
9c5e5bdb2b

+ 375 - 0
yt/yt/core/misc/consistent_hashing_ring-inl.h

@@ -0,0 +1,375 @@
+#ifndef CONSISTENT_HASHING_RING_INL_H_
+#error "Direct inclusion of this file is not allowed, include consistent_hashing_ring.h"
+// For the sake of sane code completion.
+#include "consistent_hashing_ring.h"
+#endif
+
+#include <util/generic/algorithm.h>
+
+namespace NYT {
+
+///////////////////////////////////////////////////////////////////////////////
+
+namespace NPrivate {
+
+template <class H, class O>
+class THashRangeIterator
+{
+public:
+    THashRangeIterator(O object, H hasher, int iteration = 0)
+        : Object_(object)
+        , Hasher_(hasher)
+        , Iteration_(iteration)
+    { }
+
+    ui64 operator*() const
+    {
+        return Hasher_(Object_, Iteration_);
+    }
+
+    THashRangeIterator& operator++()
+    {
+        ++Iteration_;
+        return *this;
+    }
+
+    bool operator==(THashRangeIterator rhs) const
+    {
+        YT_ASSERT(Object_ == rhs.Object_);
+        return Iteration_ == rhs.Iteration_;
+    }
+
+protected:
+    int GetIteration() const
+    {
+        return Iteration_;
+    }
+
+    typename NMpl::TCallTraits<O>::TType GetObject() const
+    {
+        return Object_;
+    }
+
+private:
+    O Object_;
+    H Hasher_;
+    int Iteration_;
+};
+
+template <class H, class O, class I>
+class TItemRangeIterator
+    : protected THashRangeIterator<H, O>
+{
+private:
+    using TBase = THashRangeIterator<H, O>;
+
+public:
+    using TBase::TBase;
+
+    std::pair<ui64, I> operator*() const{
+        return {TBase::operator*(), I{TBase::GetObject(), TBase::GetIteration()}};
+    }
+
+    TItemRangeIterator& operator++()
+    {
+        TBase::operator++();
+        return *this;
+    }
+
+    bool operator==(TItemRangeIterator rhs) const
+    {
+        return static_cast<const TBase&>(*this) == static_cast<const TBase&>(rhs);
+    }
+};
+
+template <class H, class O>
+using THashRange = std::pair<THashRangeIterator<H, O>, THashRangeIterator<H, O>>;
+
+template <class H, class O, class I>
+using TItemRange = std::pair<TItemRangeIterator<H, O, I>, TItemRangeIterator<H, O, I>>;
+
+// TODO(shakurov): hashes alone are never sufficient. Transform hash range into token range?
+template <class H, class O>
+THashRange<H, O> GetHashRange(typename NMpl::TCallTraits<O>::TType object, int hashCount)
+{
+    return {
+        THashRangeIterator(object, H()),
+        THashRangeIterator(object, H(), hashCount)
+    };
+}
+
+template <class H, class O, class I>
+TItemRange<H, O, I> GetItemRange(typename NMpl::TCallTraits<O>::TType object, int hashCount)
+{
+    return {
+        TItemRangeIterator<H, O, I>(object, H()),
+        TItemRangeIterator<H, O, I>(object, H(), hashCount)
+    };
+}
+
+} // namespace NPrivate
+
+////////////////////////////////////////////////////////////////////////////////
+
+// Just a marker - see operator== below.
+template <class S, class F, class P, class H, int N>
+class TConsistentHashingRing<S, F, P, H, N>::TFileRangeEndIterator
+{ };
+
+////////////////////////////////////////////////////////////////////////////////
+
+template <class S, class F, class P, class H, int N>
+class TConsistentHashingRing<S, F, P, H, N>::TFileRangeIterator
+{
+public:
+    typename NMpl::TCallTraits<F>::TType operator*() const
+    {
+        const auto& ringItem = RingIt_->second;
+        YT_ASSERT(std::holds_alternative<TFileReplica>(ringItem));
+        return std::get<TFileReplica>(ringItem).File;
+    }
+
+    bool operator==(const TFileRangeIterator& rhs) const
+    {
+        YT_ASSERT(&Owner_ == &rhs.Owner_);
+        return TokenRange_ == rhs.TokenRange_ && RingIt_ == rhs.RingIt_;
+    }
+
+    bool operator==(const TFileRangeEndIterator& /*rhs*/) const
+    {
+        return TokenRange_.first == TokenRange_.second;
+    }
+
+    TFileRangeIterator& operator++()
+    {
+        IncrementRingIterator();
+        if (!std::holds_alternative<TFileReplica>(RingIt_->second)) {
+            ++TokenRange_.first;
+            SkipToNextFileReplica();
+        }
+
+        return *this;
+    }
+
+private:
+    TFileRangeIterator(
+        const TConsistentHashingRing<S, F, P, H, N>& owner,
+        NPrivate::TItemRange<H, S, TServerToken> tokenRange)
+        : Owner_(owner)
+        , TokenRange_(std::move(tokenRange))
+    {
+        SkipToNextFileReplica();
+    }
+
+    void SkipToNextFileReplica()
+    {
+        while (TokenRange_.first != TokenRange_.second) {
+            RingIt_ = Ring().find(*TokenRange_.first);
+            YT_VERIFY(RingIt_ != Ring().end());
+            IncrementRingIterator();
+            if (std::holds_alternative<TFileReplica>(RingIt_->second)) {
+                break;
+            } // Otherwise this server token has zero associated files.
+            ++TokenRange_.first;
+        }
+        // Just in case.
+        if (TokenRange_.first == TokenRange_.second) {
+            RingIt_ = Ring().end();
+        }
+    }
+
+    void IncrementRingIterator()
+    {
+        if (++RingIt_ == Ring().end()) {
+            RingIt_ = Ring().begin();
+        }
+    }
+
+    const TConsistentHashingRing<S, F, P, H, N>::TRing& Ring() const
+    {
+        return Owner_.Ring_;
+    }
+
+    using TServerToken = typename TConsistentHashingRing::TServerToken;
+    using TFileReplica = typename TConsistentHashingRing::TFileReplica;
+    using TRingIterator = typename TConsistentHashingRing::TRing::const_iterator;
+
+    const TConsistentHashingRing<S, F, P, H, N>& Owner_;
+    NPrivate::TItemRange<H, S, TServerToken> TokenRange_;
+    TRingIterator RingIt_;
+
+    friend class TConsistentHashingRing;
+};
+
+////////////////////////////////////////////////////////////////////////////////
+
+template <class S, class F, class P, class H, int N>
+bool TConsistentHashingRing<S, F, P, H, N>::TServerToken::operator<(const TServerToken& rhs) const
+{
+    if (P()(Server, rhs.Server)) {
+        return true;
+    }
+    if (P()(rhs.Server, Server)) {
+        return false;
+    }
+    return Index < rhs.Index;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
+template <class S, class F, class P, class H, int N>
+bool TConsistentHashingRing<S, F, P, H, N>::TFileReplica::operator<(const TFileReplica& rhs) const
+{
+    if (P()(File, rhs.File)) {
+        return true;
+    }
+    if (P()(rhs.File, File)) {
+        return false;
+    }
+    return Index < rhs.Index;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
+template <class S, class F, class P, class H, int N>
+template <class TItem>
+bool TConsistentHashingRing<S, F, P, H, N>::TRingCompare<TItem>::operator()(const std::pair<ui64, TItem>& lhs, const std::pair<ui64, TItem>& rhs) const
+{
+    return lhs < rhs;
+}
+
+template <class S, class F, class P, class H, int N>
+template <class TItem>
+bool TConsistentHashingRing<S, F, P, H, N>::TRingCompare<TItem>::operator()(ui64 lhs, const std::pair<ui64, TItem>& rhs) const
+{
+    // Naked hash is always less than corresponding items.
+    if (lhs == rhs.first) {
+        return true;
+    }
+    return lhs < rhs.first;
+}
+
+template <class S, class F, class P, class H, int N>
+template <class TItem>
+bool TConsistentHashingRing<S, F, P, H, N>::TRingCompare<TItem>::operator()(const std::pair<ui64, TItem>& lhs, ui64 rhs) const
+{
+    // Naked hash is always less than corresponding items.
+    if (lhs.first == rhs) {
+        return false;
+    }
+    return lhs.first < rhs;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
+template <class S, class F, class P, class H, int N>
+int TConsistentHashingRing<S, F, P, H, N>::GetSize() const
+{
+    return Ring_.size();
+}
+
+template <class S, class F, class P, class H, int N>
+int TConsistentHashingRing<S, F, P, H, N>::GetTokenCount() const
+{
+    return TokenRing_.size();
+}
+
+template <class S, class F, class P, class H, int N>
+void TConsistentHashingRing<S, F, P, H, N>::AddServer(typename NMpl::TCallTraits<S>::TType server, int tokenCount)
+{
+    YT_VERIFY(tokenCount > 0);
+
+    for (auto [it, ite] = NPrivate::GetItemRange<H, S, TServerToken>(server, tokenCount); it != ite; ++it) {
+        auto [hash, serverToken] = *it;
+        EmplaceOrCrash(Ring_, hash, serverToken);
+        EmplaceOrCrash(TokenRing_, hash, serverToken);
+    }
+}
+
+template <class S, class F, class P, class H, int N>
+void TConsistentHashingRing<S, F, P, H, N>::RemoveServer(typename NMpl::TCallTraits<S>::TType server, int tokenCount)
+{
+    YT_VERIFY(tokenCount > 0);
+
+    for (auto [it, ite] = NPrivate::GetItemRange<H, S, TServerToken>(server, tokenCount); it != ite; ++it) {
+        auto [hash, serverToken] = *it;
+        EraseOrCrash(Ring_, std::pair(hash, serverToken));
+        EraseOrCrash(TokenRing_, std::pair(hash, serverToken));
+    }
+}
+
+template <class S, class F, class P, class H, int N>
+void TConsistentHashingRing<S, F, P, H, N>::AddFile(typename NMpl::TCallTraits<F>::TType file, int replicaCount)
+{
+    for (auto [it, ite] = NPrivate::GetItemRange<H, F, TFileReplica>(file, replicaCount); it != ite; ++it) {
+        auto [hash, fileReplica] = *it;
+        EmplaceOrCrash(Ring_, hash, fileReplica);
+    }
+}
+
+template <class S, class F, class P, class H, int N>
+void TConsistentHashingRing<S, F, P, H, N>::RemoveFile(typename NMpl::TCallTraits<F>::TType file, int replicaCount)
+{
+    for (auto [it, ite] = NPrivate::GetItemRange<H, F, TFileReplica>(file, replicaCount); it != ite; ++it) {
+        auto [hash, fileReplica] = *it;
+        EraseOrCrash(Ring_, std::pair(hash, fileReplica));
+    }
+}
+
+template <class S, class F, class P, class H, int N>
+typename TConsistentHashingRing<S, F, P, H, N>::TFileRange
+TConsistentHashingRing<S, F, P, H, N>::GetFileRangeForServer(typename NMpl::TCallTraits<S>::TType server, int tokenCount) const
+{
+    YT_VERIFY(tokenCount > 0);
+
+    return {
+        TFileRangeIterator(*this, NPrivate::GetItemRange<H, S, TServerToken>(server, tokenCount)),
+        TFileRangeEndIterator()};
+}
+
+template <class S, class F, class P, class H, int N>
+TCompactVector<S, N> TConsistentHashingRing<S, F, P, H, N>::GetServersForFile(typename NMpl::TCallTraits<F>::TType file, int replicaCount) const
+{
+    auto targetTokens = GetTokensForFile(file, replicaCount);
+    TCompactVector<S, N> result;
+    result.reserve(targetTokens.size());
+    std::transform(
+        targetTokens.begin(),
+        targetTokens.end(),
+        std::back_inserter(result),
+        [] (const TServerToken& token) {
+            return token.Server;
+        });
+    return result;
+}
+
+template <class S, class F, class P, class H, int N>
+TCompactVector<typename TConsistentHashingRing<S, F, P, H, N>::TServerToken, N>
+TConsistentHashingRing<S, F, P, H, N>::GetTokensForFile(typename NMpl::TCallTraits<F>::TType file, int replicaCount) const
+{
+    using TResult = TCompactVector<typename TConsistentHashingRing<S, F, P, H, N>::TServerToken, N>;
+
+    if (TokenRing_.empty()) {
+        return {};
+    }
+
+    TResult result;
+    result.reserve(replicaCount);
+
+    for (auto [hashIt, hashIte] = NPrivate::GetHashRange<H, F>(file, replicaCount); hashIt != hashIte; ++hashIt) {
+        auto tokenIt = TokenRing_.upper_bound(*hashIt);
+        if (tokenIt == TokenRing_.begin()) {
+            tokenIt = --TokenRing_.end();
+        } else {
+            --tokenIt;
+        }
+
+        result.push_back(tokenIt->second);
+    }
+
+    return result;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
+} // namespace NYT

+ 147 - 0
yt/yt/core/misc/consistent_hashing_ring.h

@@ -0,0 +1,147 @@
+#pragma once
+
+#include "public.h"
+
+#include "mpl.h"
+
+#include <library/cpp/yt/small_containers/compact_vector.h>
+
+#include <variant>
+
+namespace NYT {
+
+////////////////////////////////////////////////////////////////////////////////
+
+//! A device that tries to establish a consistent matching of a set of 'files' to a set of 'servers'.
+/*!
+   The algorithm tries to keep the number of files to relocate in case of a
+   server outage (i.e. the server having been removed from the ring) manageable.
+   This is achieved via placing ("marking") a server on the ring not once, but
+   multiple times at multiple places (such individual mark is called a "token").
+
+   Similarly, placing several replicas of a file on the ring is explicitly
+   supported (an individual mark is called a "replica").
+
+   Template parameter description:
+   - S is server;
+   - F is file;
+   - P is comparator. It must be able to compare servers and files:
+       p(s, s) -> bool
+       p(f, f) -> bool
+     where p, s and f are instances of P, S and F, respectively.
+   - H is hasher. It must be able to produce a series of hashes from a server or a file:
+       h(s, i) -> ui64
+       h(f, i) -> ui64
+     where i is an int, and h, s and f are instances of H, S and F, respectively.
+     (NB: care should be taken to avoid serial hash collisions. That is, if
+       h(s1, 0) == h(s2, 0) it should be likely that h(s1, 1) != h(s2, 1).
+     This is the reason why a hasher is called with both and object to hash and
+     an additional sequence number.)
+
+   TODO(shakurov): express requirements to P and H as C++ concepts.
+*/
+template <class S, class F, class P, class H, int N>
+class TConsistentHashingRing
+{
+public:
+    class TFileRangeIterator;
+    class TFileRangeEndIterator;
+    using TFileRange = std::pair<TFileRangeIterator, TFileRangeEndIterator>;
+
+    int GetSize() const;
+    int GetTokenCount() const;
+
+    //! Places #tokenCount tokens for this server on the ring.
+    /*!
+     *  The server must not have been added yet.
+     *
+     *  /note Adding a server is likely to change files-server associations.
+     *  Consider calling #GetFileRangeForServer immediately after to identify
+     *  affected files.
+     */
+    void AddServer(typename NMpl::TCallTraits<S>::TType server, int tokenCount);
+
+    //! Removes server tokens from the ring.
+    /*!
+     *  The server must have been previously added (and not subsequently removed).
+     *  Token count must be exactly the same as the number of tokens that have been added.
+     *
+     *  /note Removing a server is likely to change files-server associations.
+     *  Consider calling #GetFileRangeForServer immediately before removal
+     *  to identify affected files.
+     */
+    void RemoveServer(typename NMpl::TCallTraits<S>::TType server, int tokenCount);
+
+    //! Places file replicas on the ring.
+    void AddFile(typename NMpl::TCallTraits<F>::TType file, int replicaCount);
+
+    //! Removes file replicas from the ring. #replicaCount must be exactly the
+    //! same as has been previously added.
+    void RemoveFile(typename NMpl::TCallTraits<F>::TType file, int replicaCount);
+
+    //! Returns an iterator over a set of files matched to the specified server
+    //! (according the current state of the ring).
+    /*!
+     *  The returned range doesn't distinguish same-file replicas and thus may
+     *  produce duplicate files.
+     *
+     *  /note The range is invalidated if any change is made to the ring.
+     */
+    TFileRange GetFileRangeForServer(typename NMpl::TCallTraits<S>::TType server, int tokenCount) const;
+
+    //! Returns the list of servers to which replicas of the specified files are
+    //! matched (according the current state of the ring).
+    /*!
+     *  /note May return duplicates.
+     */
+    TCompactVector<S, N> GetServersForFile(typename NMpl::TCallTraits<F>::TType file, int replicaCount) const;
+
+private:
+    struct TServerToken
+    {
+        S Server;
+        int Index;
+
+        bool operator<(const TServerToken& rhs) const;
+    };
+
+    struct TFileReplica
+    {
+        F File;
+        int Index;
+
+        bool operator<(const TFileReplica& rhs) const;
+    };
+
+    // The order is important: files must be "greater than" servers so that, in
+    // case a file and a server hash to the same value, the former is associated
+    // with the latter (and not with some other server).
+    using TRingItem = std::variant<TServerToken, TFileReplica>;
+
+    // Naked hash is always less then corresponding items.
+    template <class TItem>
+    struct TRingCompare
+    {
+        using is_transparent = void;
+
+        bool operator()(const std::pair<ui64, TItem>& lhs, const std::pair<ui64, TItem>& rhs) const;
+        bool operator()(ui64 lhs, const std::pair<ui64, TItem>& rhs) const;
+        bool operator()(const std::pair<ui64, TItem>& lhs, ui64 rhs) const;
+    };
+
+    using TRing = std::set<std::pair<ui64, TRingItem>, TRingCompare<TRingItem>>;
+    using TServerTokenRing = std::set<std::pair<ui64, TServerToken>, TRingCompare<TServerToken>>;
+
+    TCompactVector<TServerToken, N> GetTokensForFile(typename NMpl::TCallTraits<F>::TType file, int replicaCount) const;
+
+    TRing Ring_;
+    TServerTokenRing TokenRing_; // No, not *that* token ring :-)
+};
+
+////////////////////////////////////////////////////////////////////////////////
+
+} // namespace NYT
+
+#define CONSISTENT_HASHING_RING_INL_H_
+#include "consistent_hashing_ring-inl.h"
+#undef CONSISTENT_HASHING_RING_INL_H_

+ 605 - 0
yt/yt/core/misc/unittests/consistent_hashing_ut.cpp

@@ -0,0 +1,605 @@
+#include <yt/yt/core/test_framework/framework.h>
+
+#include <yt/yt/core/misc/consistent_hashing_ring.h>
+#include <yt/yt/core/misc/config.h>
+#include <yt/yt/core/misc/digest.h>
+
+#include <library/cpp/yt/string/raw_formatter.h>
+
+#include <algorithm>
+#include <random>
+#include <string>
+
+namespace NYT {
+namespace {
+
+////////////////////////////////////////////////////////////////////////////////
+
+struct TStringComparer
+{
+    bool operator()(const TString& lhs, const TString& rhs) const
+    {
+        return lhs < rhs;
+    }
+};
+
+struct TStringHasher
+{
+    ui64 operator()(const TString& node, int index) const
+    {
+        return ::THash<TStringBuf>()(node + std::to_string(index));
+    }
+};
+
+struct TCustomStringHasher
+{
+    ui64 operator()(const TString& node, int index) const
+    {
+        if (index == 0) {
+            if (node == "a") {
+                return 0;
+            }
+            if (node == "b") {
+                return 0;
+            }
+            if (node == "c") {
+                return 1;
+            }
+            return node[0] - 'a';
+        }
+        if (index == 1) {
+            return node[0] - 'a' + 2;
+        }
+        YT_UNIMPLEMENTED();
+    }
+};
+
+TEST(TConsistentHashingRing, CheckCollision)
+{
+    TConsistentHashingRing<TString, TString, TStringComparer, TCustomStringHasher, 1> ring;
+    ring.AddFile("a", 1);
+    ring.AddServer("b", 1);
+    ring.AddServer("c", 1);
+    EXPECT_EQ(ring.GetServersForFile("a", 1)[0], "c");
+}
+
+void CheckServers(const TCompactVector<TString, 1>& src, const std::vector<TString>& target)
+{
+    EXPECT_EQ(src.size(), target.size());
+    EXPECT_TRUE(std::equal(src.begin(), src.end(), target.begin()));
+}
+
+TEST(TConsistentHashingRing, AddRemove)
+{
+    TConsistentHashingRing<TString, TString, TStringComparer, TCustomStringHasher, 1> ring;
+    ring.AddFile("a", 1);
+    ring.AddServer("b", 1);
+    CheckServers(ring.GetServersForFile("a", 1), {"b"});
+
+    ring.AddFile("d", 1);
+    CheckServers(ring.GetServersForFile("d", 1), {"b"});
+    ring.AddServer("e", 1);
+
+    CheckServers(ring.GetServersForFile("d", 1), {"b"});
+    CheckServers(ring.GetServersForFile("a", 1), {"e"});
+
+    ring.AddFile("g", 1);
+
+    CheckServers(ring.GetServersForFile("d", 1), {"b"});
+    CheckServers(ring.GetServersForFile("a", 1), {"e"});
+    CheckServers(ring.GetServersForFile("g", 1), {"e"});
+
+    ring.AddServer("f", 1);
+
+    CheckServers(ring.GetServersForFile("d", 1), {"b"});
+    CheckServers(ring.GetServersForFile("a", 1), {"f"});
+    CheckServers(ring.GetServersForFile("g", 1), {"f"});
+
+    ring.RemoveServer("b", 1);
+
+    CheckServers(ring.GetServersForFile("d", 1),{"f"});
+    CheckServers(ring.GetServersForFile("a", 1), {"f"});
+    CheckServers(ring.GetServersForFile("g", 1), {"f"});
+
+    ring.AddServer("c", 1);
+    CheckServers(ring.GetServersForFile("d", 1), {"c"});
+    CheckServers(ring.GetServersForFile("a", 1), {"f"});
+    CheckServers(ring.GetServersForFile("g", 1), {"f"});
+}
+
+TEST(TConsistentHashingRing, AddRemoveManyReplicas)
+{
+    TConsistentHashingRing<TString, TString, TStringComparer, TCustomStringHasher, 1> ring;
+
+    ring.AddFile("a", 1);
+    ring.AddServer("b", 2);
+
+    CheckServers(ring.GetServersForFile("a", 1), {"b"});
+
+    ring.AddFile("e", 1);
+    ring.AddServer("d", 1);
+
+    CheckServers(ring.GetServersForFile("a", 1), {"d"});
+    CheckServers(ring.GetServersForFile("e", 1), {"d"});
+
+    ring.AddFile("f", 2);
+
+    CheckServers(ring.GetServersForFile("a", 1), {"d"});
+    CheckServers(ring.GetServersForFile("e", 1), {"d"});
+    CheckServers(ring.GetServersForFile("f", 2), {"d", "d"});
+
+    ring.AddServer("c", 2);
+
+    CheckServers(ring.GetServersForFile("a", 1), {"c"});
+    CheckServers(ring.GetServersForFile("e", 1), {"d"});
+    CheckServers(ring.GetServersForFile("f", 2), {"c", "c"});
+
+    ring.AddServer("a", 2);
+
+    CheckServers(ring.GetServersForFile("a", 1), {"c"});
+    CheckServers(ring.GetServersForFile("e", 1), {"d"});
+    CheckServers(ring.GetServersForFile("f", 2), {"c", "c"});
+
+    ring.RemoveFile("a", 1);
+
+    CheckServers(ring.GetServersForFile("e", 1), {"d"});
+    CheckServers(ring.GetServersForFile("f", 2), {"c", "c"});
+
+    ring.RemoveServer("d", 1);
+
+    CheckServers(ring.GetServersForFile("e", 1), {"b"});
+    CheckServers(ring.GetServersForFile("f", 2), {"c", "c"});
+}
+
+TEST(TConsistentHashingRing, CheckConsistency)
+{
+    TConsistentHashingRing<TString, TString, TStringComparer, TCustomStringHasher, 1> ring;
+
+    ring.AddFile("e", 1);
+
+    ring.AddServer("a", 2);
+    ring.AddServer("c", 2);
+    ring.AddServer("d", 1);
+
+    auto chunkResultBefore = ring.GetServersForFile("e", 1);
+    ring.RemoveServer("a", 2);
+    EXPECT_EQ(chunkResultBefore, ring.GetServersForFile("e", 1));
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
+static constexpr size_t STRING_SIZE = 5;
+
+static constexpr size_t MOD = 531977;
+
+static constexpr size_t NODE_MULTIPLIER = 446179;
+static constexpr size_t INDEX_MULTIPLIER = 389891;
+
+static constexpr const char* POSSIBLE_SYMBOLS = "ABCDEFGHIGKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz";
+static constexpr size_t POSSIBLE_SYMBOLS_COUNT = std::char_traits<char>::length(POSSIBLE_SYMBOLS);
+
+struct THasher
+{
+    ui64 operator()(int node, int index) const
+    {
+        return (NODE_MULTIPLIER * node + INDEX_MULTIPLIER * index) % MOD;
+    }
+};
+
+class TUniformGenerator
+{
+public:
+    explicit TUniformGenerator(size_t minValue = 0, size_t maxValue = MOD)
+    : Generator_(RandomDevice_())
+    , Distribution_(minValue, maxValue)
+    { }
+
+    ui64 operator()(size_t maxValue = MOD)
+    {
+        return Distribution_(Generator_) % maxValue;
+    }
+
+private:
+    std::random_device RandomDevice_;
+    std::mt19937 Generator_;
+    std::uniform_int_distribution<> Distribution_;
+};
+
+using TCrpItemWithToken = std::pair<TString, int>;
+
+enum class EQueryType
+{
+    AddServer,
+    DeleteServer,
+    AddFile,
+    DeleteFile
+};
+
+class TCrpItemsContainer
+{
+public:
+    bool Insert(const TCrpItemWithToken& crp)
+    {
+        if (ItemToIndex_.contains(crp)) {
+            return false;
+        }
+
+        ItemToIndex_[crp] = Items_.size();
+        Items_.push_back(crp);
+        return true;
+    }
+
+    TCrpItemWithToken EraseRandom(TUniformGenerator& generator)
+    {
+        auto deleteIndex = generator(Items_.size());
+        auto deleteItem = Items_[deleteIndex];
+        ItemToIndex_[Items_.back()] = deleteIndex;
+        ItemToIndex_.erase(deleteItem);
+
+        std::swap(Items_.back(), Items_[deleteIndex]);
+        Items_.pop_back();
+        return std::move(deleteItem);
+    }
+
+    std::vector<TCrpItemWithToken>::iterator begin()
+    {
+        return Items_.begin();
+    }
+
+    std::vector<TCrpItemWithToken>::iterator end()
+    {
+        return Items_.end();
+    }
+
+    size_t Size() const
+    {
+        return Items_.size();
+    }
+
+private:
+    std::vector<TCrpItemWithToken> Items_;
+    THashMap<TCrpItemWithToken, size_t> ItemToIndex_;
+};
+
+template <typename GS, typename GF, typename GQ>
+double GetPercentageInconsistentFiles(
+    GS serverGenerator,
+    GF fileGenerator,
+    size_t fileCount,
+    size_t serverCount,
+    size_t queryCount,
+    GQ queryGenerator,
+    size_t candidateCount,
+    size_t batchSize = 1)
+{
+    TConsistentHashingRing<TString, TString, TStringComparer, TStringHasher, 3> ring;
+
+    TCrpItemsContainer servers;
+    TCrpItemsContainer files;
+
+    for (size_t i = 0; i < fileCount; ++i) {
+        auto generatedFile = fileGenerator();
+        while (!files.Insert(generatedFile)) {
+            generatedFile = fileGenerator();
+        }
+        ring.AddFile(generatedFile.first, generatedFile.second);
+    }
+
+    for (size_t i = 0; i < serverCount; ++i) {
+        auto generatedServer = serverGenerator();
+        while (!servers.Insert(generatedServer)) {
+            generatedServer = serverGenerator();
+        }
+        ring.AddServer(generatedServer.first, generatedServer.second);
+    }
+
+    auto countDisplaced = [&] (const std::vector<std::pair<EQueryType, std::pair<TString, int>>>& queries) {
+        std::map<TCrpItemWithToken, TCompactVector<TString, 1>> serversBefore;
+        for (const auto& file: files) {
+            auto candidates = ring.GetServersForFile(file.first, file.second);
+            candidates.resize(std::min(static_cast<size_t>(file.second), candidateCount));
+            serversBefore[file] = candidates;
+        }
+
+        for (const auto& [queryType, itemWithToken]: queries) {
+            switch (queryType) {
+                case EQueryType::AddServer:
+                    ring.AddServer(itemWithToken.first, itemWithToken.second);
+                    break;
+                case EQueryType::DeleteServer:
+                    ring.RemoveServer(itemWithToken.first, itemWithToken.second);
+                    break;
+                case EQueryType::AddFile:
+                    ring.AddFile(itemWithToken.first, itemWithToken.second);
+                    break;
+                case EQueryType::DeleteFile:
+                    ring.RemoveFile(itemWithToken.first, itemWithToken.second);
+                    break;
+                default:
+                    break;
+            }
+        }
+
+        int result = 0;
+        for (const auto& elem: files) {
+            auto candidates = ring.GetServersForFile(elem.first, elem.second);
+            candidates.resize(std::min(static_cast<size_t>(elem.second), candidateCount));
+            result += (serversBefore[elem] != candidates);
+        }
+        return result;
+    };
+
+    TUniformGenerator generator;
+    int maxDisplacedFileCount = 0;
+    std::vector<std::pair<EQueryType, TCrpItemWithToken>> batch;
+    for (size_t i = 0; i < queryCount; ++i) {
+        auto [queryType, itemWithToken] = queryGenerator();
+        auto item = std::pair<EQueryType, TCrpItemWithToken>(queryType, itemWithToken);
+        switch (queryType) {
+            case EQueryType::AddServer:
+                while (!servers.Insert(itemWithToken)) {
+                    itemWithToken = queryGenerator().second;
+                }
+                break;
+            case EQueryType::DeleteServer:
+                if (servers.Size() > 0) {
+                    itemWithToken = servers.EraseRandom(generator);
+                } else {
+                    continue;
+                }
+                break;
+            case EQueryType::AddFile:
+                while (!files.Insert(itemWithToken)) {
+                    itemWithToken = queryGenerator().second;
+                }
+                break;
+            case EQueryType::DeleteFile:
+                if (files.Size() > 0) {
+                    itemWithToken = files.EraseRandom(generator);
+                } else {
+                    continue;
+                }
+                break;
+            default:
+                continue;
+        }
+
+        batch.emplace_back(queryType, std::move(itemWithToken));
+        if (batch.size() == batchSize) {
+            maxDisplacedFileCount = std::max(maxDisplacedFileCount, countDisplaced(std::move(batch)));
+            batch.clear();
+        }
+    }
+    maxDisplacedFileCount = std::max(maxDisplacedFileCount, countDisplaced(std::move(batch)));
+
+    auto inconsistentChunkPercentage = static_cast<double>(maxDisplacedFileCount) / static_cast<double>(fileCount);
+    return inconsistentChunkPercentage;
+}
+
+TCrpItemWithToken GenerateItem() {
+    auto generator = TUniformGenerator();
+
+    TString buffer;
+    for (size_t i = 0; i < STRING_SIZE; ++i) {
+        buffer.push_back(POSSIBLE_SYMBOLS[generator(POSSIBLE_SYMBOLS_COUNT)]);
+    }
+
+    int tokenCount = 1 + generator(5);
+    return TCrpItemWithToken(buffer, tokenCount);
+}
+
+TEST(TConsistentHashingRing, AddAndRemoveStress)
+{
+    auto generator = TUniformGenerator();
+
+    auto generateQuery = [&] () {
+        auto file = GenerateItem();
+        return std::pair<EQueryType, TCrpItemWithToken>(static_cast<EQueryType>(generator(4)), file);
+    };
+
+    auto result = GetPercentageInconsistentFiles(
+        /*serverGenerator*/ GenerateItem,
+        /*fileGenerator*/ GenerateItem,
+        /*fileCount*/ 1000,
+        /*serverCount*/ 1000,
+        /*queryCount*/ 10000,
+        /*queryGenerator*/ generateQuery,
+        /*candidateCount*/ 3);
+    EXPECT_LE(result, 0.07);
+}
+
+TEST(TConsistentHashingRing, AdditionBarrierStress)
+{
+    auto generator = TUniformGenerator();
+
+    size_t queriesGenerated = 0;
+    const size_t barrierAfter = 1000;
+    const size_t cntAddings = 180;
+
+    auto generateQuery = [&] () {
+        ++queriesGenerated;
+        auto item = GenerateItem();
+        if (queriesGenerated >= barrierAfter && queriesGenerated < cntAddings + barrierAfter) {
+            return std::pair<EQueryType, TCrpItemWithToken>(EQueryType::AddServer, item);
+        }
+        return std::pair<EQueryType, TCrpItemWithToken>(static_cast<EQueryType>(generator(4)), item);
+    };
+
+    auto result = GetPercentageInconsistentFiles(
+        /*serverGenerator*/ GenerateItem,
+        /*fileGenerator*/ GenerateItem,
+        /*fileCount*/ 1000,
+        /*serverCount*/ 1000,
+        /*queryCount*/ 5000,
+        /*queryGenerator*/ generateQuery,
+        /*candidateCount*/ 3);
+    EXPECT_LE(result, 0.07);
+}
+
+TEST(TConsistentHashingRing, ServerAdditionBarrierStress)
+{
+    auto generator = TUniformGenerator();
+
+    size_t queriesGenerated = 0;
+    const size_t queriesBeforeBarrier = 1000;
+    const size_t additionalServerCount = 180;
+
+    auto generateQuery = [&] () {
+        ++queriesGenerated;
+        auto item = GenerateItem();
+        if (queriesGenerated >= queriesBeforeBarrier && queriesGenerated < additionalServerCount + queriesBeforeBarrier) {
+            return std::pair<EQueryType, TCrpItemWithToken>(EQueryType::AddServer, item);
+        }
+        return std::pair<EQueryType, TCrpItemWithToken>(static_cast<EQueryType>(generator(2)), item);
+    };
+
+    auto result = GetPercentageInconsistentFiles(
+        /*serverGenerator*/ GenerateItem,
+        /*fileGenerator*/ GenerateItem,
+        /*fileCount*/ 1000,
+        /*serverCount*/ 1000,
+        /*queryCount=*/ 2000,
+        /*queryGenerator*/ generateQuery,
+        /*candidateCount*/ 3);
+    EXPECT_LE(result, 0.05);
+}
+
+TEST(TConsistentHashingRing, FilesAdditionBarrierStress)
+{
+    auto generator = TUniformGenerator();
+
+    size_t queriesGenerated = 0;
+    const size_t queriesBeforeBarrier = 100;
+    const size_t additionalServerCount = 180;
+
+    auto generateQuery = [&] () {
+        ++queriesGenerated;
+        auto item = GenerateItem();
+        if (queriesGenerated >= queriesBeforeBarrier && queriesGenerated < additionalServerCount + queriesBeforeBarrier) {
+            return std::pair<EQueryType, TCrpItemWithToken>(EQueryType::AddServer, item);
+        }
+        return std::pair<EQueryType, TCrpItemWithToken>(static_cast<EQueryType>(2 + generator(2)), item);
+    };
+
+    auto result = GetPercentageInconsistentFiles(
+        /*serverGenerator*/ GenerateItem,
+        /*fileGenerator*/ GenerateItem,
+        /*fileCount*/ 1000,
+        /*serverCount*/ 1000,
+        /*queryCount*/ 600,
+        /*queryGenerator*/ generateQuery,
+        /*candidateCount*/ 3);
+    EXPECT_LE(result, 0.07);
+}
+
+template <int N>
+TCrpItemWithToken GenerateFile() {
+    auto item = GenerateItem();
+    item.second = N;
+    return item;
+}
+
+template <int N, int barrierAfter, int cntAddings>
+std::pair<EQueryType, TCrpItemWithToken> GenerateQuery() {
+    auto generator = TUniformGenerator();
+
+    static int queriesGenerated = 0;
+
+    ++queriesGenerated;
+    auto item = GenerateItem();
+    if (queriesGenerated >= barrierAfter && queriesGenerated < cntAddings + barrierAfter) {
+        return std::pair<EQueryType, TCrpItemWithToken>(EQueryType::AddServer, item);
+    }
+
+    auto queryType = static_cast<EQueryType>(generator(4));
+    if (queryType == EQueryType::DeleteFile || queryType == EQueryType::AddFile) {
+        item.second = N;
+    }
+    return std::pair<EQueryType, TCrpItemWithToken>(queryType, item);
+}
+
+TEST(TConsistentHashingRing, ManyNodesSimultaneouslyStress)
+{
+    auto singleReplicaResult = GetPercentageInconsistentFiles(
+        /*serverGenerator*/ GenerateItem,
+        /*fileGenerator*/ GenerateFile<100>,
+        /*fileCount*/ 1000,
+        /*serverCount*/ 1000,
+        /*queryCount*/ 600,
+        /*queryGenerator*/ GenerateQuery<100, 40, 250>,
+        /*candidateCount*/ 1,
+        /*batchSize*/ 200);
+
+    auto multipleReplicaResult = GetPercentageInconsistentFiles(
+        /*serverGenerator*/ GenerateItem,
+        /*fileGenerator*/ GenerateFile<101>,
+        /*fileCount*/ 1000,
+        /*serverCount*/ 1000,
+        /*queryCount*/ 600,
+        /*queryGenerator*/ GenerateQuery<101, 40, 250>,
+        /*candidateCount*/ 3,
+        /*batchSize*/ 300);
+
+    EXPECT_GE(singleReplicaResult, 0.1);
+    EXPECT_GE(multipleReplicaResult, 0.3);
+}
+
+TEST(TConsistentHashingRing, SmallTokenCount)
+{
+    const size_t testCases = 10;
+
+    auto singleReplicaLargeResult = 0.0;
+    auto manyReplicasLargeResult = 0.0;
+    auto singleReplicaSmallResult = 0.0;
+    auto manyReplicasSmallResult = 0.0;
+
+    for (size_t i = 0; i < testCases; ++i) {
+        singleReplicaLargeResult += GetPercentageInconsistentFiles(
+            /*serverGenerator*/ GenerateItem,
+            /*fileGenerator*/ GenerateFile<100>,
+            /*fileCount*/ 1000,
+            /*serverCount*/ 1000,
+            /*queryCount*/ 600,
+            /*queryGenerator*/ GenerateQuery<100, 40, 250>,
+            /*candidateCount*/ 1,
+            /*batchSize*/ 200) / testCases;
+
+        manyReplicasLargeResult += GetPercentageInconsistentFiles(
+            /*serverGenerator*/ GenerateItem,
+            /*fileGenerator*/ GenerateFile<101>,
+            /*fileCount*/ 1000,
+            /*serverCount*/ 1000,
+            /*queryCount*/ 600,
+            /*queryGenerator*/ GenerateQuery<101, 40, 250>,
+            /*candidateCount*/ 3,
+            /*batchSize*/ 200) / testCases;
+
+        singleReplicaSmallResult += GetPercentageInconsistentFiles(
+            /*serverGenerator*/ GenerateItem,
+            /*fileGenerator*/ GenerateFile<10>,
+            /*fileCount*/ 1000,
+            /*serverCount*/ 1000,
+            /*queryCount*/ 600,
+            /*queryGenerator*/ GenerateQuery<10, 40, 250>,
+            /*candidateCount=*/ 1,
+            /*batchSize*/ 200) / testCases;
+
+        manyReplicasSmallResult += GetPercentageInconsistentFiles(
+            /*serverGenerator*/ GenerateItem,
+            /*fileGenerator*/ GenerateFile<11>,
+            /*fileCount*/ 1000,
+            /*serverCount*/ 1000,
+            /*queryCount*/ 600,
+            /*queryGenerator*/ GenerateQuery<11, 40, 250>,
+            /*candidateCount*/ 3,
+            /*batchSize*/ 200) / testCases;
+    }
+
+    EXPECT_LE(std::fabs(singleReplicaLargeResult - singleReplicaSmallResult), 0.1);
+    EXPECT_LE(std::fabs(manyReplicasLargeResult - manyReplicasSmallResult), 0.1);
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
+} // namespace
+} // namespace NYT

+ 1 - 0
yt/yt/core/misc/unittests/ya.make

@@ -22,6 +22,7 @@ SRCS(
     checksum_ut.cpp
     codicil_ut.cpp
     concurrent_cache_ut.cpp
+    consistent_hashing_ut.cpp
     default_map_ut.cpp
     digest_ut.cpp
     ema_counter_ut.cpp