Browse Source

Batch processing of sending notifications upon commit KIKIMR-14597

ref:d6355b1d60ab63873f910527d43ca9cf0f2bbaea
Ilnaz Nizametdinov 2 years ago
parent
commit
138b0b2d40
1 changed files with 72 additions and 6 deletions
  1. 72 6
      ydb/core/tx/scheme_board/replica.cpp

+ 72 - 6
ydb/core/tx/scheme_board/replica.cpp

@@ -36,6 +36,26 @@ class TReplica: public TMonitorableActor<TReplica> {
     using TDescribeSchemeResult = NKikimrScheme::TEvDescribeSchemeResult;
     using TDescribeSchemeResult = NKikimrScheme::TEvDescribeSchemeResult;
     using TCapabilities = NKikimrSchemeBoard::TEvSubscribe::TCapabilities;
     using TCapabilities = NKikimrSchemeBoard::TEvSubscribe::TCapabilities;
 
 
+    struct TEvPrivate {
+        enum EEv {
+            EvSendStrongNotifications = EventSpaceBegin(TKikimrEvents::ES_PRIVATE),
+
+            EvEnd,
+        };
+
+        static_assert(EvEnd < EventSpaceEnd(TKikimrEvents::ES_PRIVATE), "expect EvEnd < EventSpaceEnd(TKikimrEvents::ES_PRIVATE)");
+
+        struct TEvSendStrongNotifications: public TEventLocal<TEvSendStrongNotifications, EvSendStrongNotifications> {
+            static constexpr ui32 BatchSize = 1000;
+            const ui64 Owner;
+
+            explicit TEvSendStrongNotifications(ui64 owner)
+                : Owner(owner)
+            {
+            }
+        };
+    };
+
 public:
 public:
     enum ESubscriptionType {
     enum ESubscriptionType {
         SUBSCRIPTION_UNSPECIFIED, // for filtration
         SUBSCRIPTION_UNSPECIFIED, // for filtration
@@ -692,6 +712,8 @@ private:
         if (!notify->Record.GetStrong()) {
         if (!notify->Record.GetStrong()) {
             auto& info = desc->GetSubscriberInfo(subscriber);
             auto& info = desc->GetSubscriberInfo(subscriber);
             info.NeedStrongNotification();
             info.NeedStrongNotification();
+
+            WaitStrongNotifications[domainOwnerId].insert(subscriber);
         }
         }
 
 
         Send(subscriber, std::move(notify), flags);
         Send(subscriber, std::move(notify), flags);
@@ -1048,22 +1070,58 @@ private:
         info.Generation = info.PendingGeneration;
         info.Generation = info.PendingGeneration;
 
 
         Send(ev->Sender, new TSchemeBoardEvents::TEvCommitResponse(owner, info.Generation), 0, ev->Cookie);
         Send(ev->Sender, new TSchemeBoardEvents::TEvCommitResponse(owner, info.Generation), 0, ev->Cookie);
+        Send(SelfId(), new TEvPrivate::TEvSendStrongNotifications(owner));
+    }
+
+    void Handle(TEvPrivate::TEvSendStrongNotifications::TPtr& ev) {
+        const auto limit = ev->Get()->BatchSize;
+        const auto owner = ev->Get()->Owner;
+
+        SBR_LOG_D("Handle TEvPrivate::TEvSendStrongNotifications"
+            << ": self# " << SelfId()
+            << ", owner# " << owner);
+
+        if (!IsPopulatorCommited(owner)) {
+            SBR_LOG_N("Populator is not commited"
+                << ": self# " << SelfId()
+                << ", owner# " << owner);
+            return;
+        }
+
+        auto itSubscribers = WaitStrongNotifications.find(owner);
+        if (itSubscribers == WaitStrongNotifications.end()) {
+            SBR_LOG_E("Invalid owner"
+                << ": self# " << SelfId()
+                << ", owner# " << owner);
+            return;
+        }
+
+        auto& subscribers = itSubscribers->second;
+        auto it = subscribers.begin();
+        ui32 count = 0;
+
+        while (count++ < limit && it != subscribers.end()) {
+            const TActorId subscriber = *it;
+            it = subscribers.erase(it);
+
+            auto jt = Subscribers.find(subscriber);
+            if (jt == Subscribers.end()) {
+                continue;
+            }
 
 
-        for (const auto& [subscriber, id] : Subscribers) {
             TDescription* desc = nullptr;
             TDescription* desc = nullptr;
 
 
-            if (const TString* path = std::get_if<TString>(&id)) {
+            if (const TString* path = std::get_if<TString>(&jt->second)) {
                 desc = Descriptions.FindPtr(*path);
                 desc = Descriptions.FindPtr(*path);
-            } else if (const TPathId* pathId = std::get_if<TPathId>(&id)) {
+            } else if (const TPathId* pathId = std::get_if<TPathId>(&jt->second)) {
                 desc = Descriptions.FindPtr(*pathId);
                 desc = Descriptions.FindPtr(*pathId);
             }
             }
 
 
             Y_VERIFY(desc);
             Y_VERIFY(desc);
             auto& info = desc->GetSubscriberInfo(subscriber);
             auto& info = desc->GetSubscriberInfo(subscriber);
 
 
-            if (info.GetDomainOwnerId() != owner
-                || info.IsNotifiedStrongly()
-                || info.IsWaitForAck()) {
+            Y_VERIFY(info.GetDomainOwnerId() == owner);
+            if (info.IsNotifiedStrongly() || info.IsWaitForAck()) {
                 continue;
                 continue;
             }
             }
 
 
@@ -1071,6 +1129,12 @@ private:
             info.EnqueueVersion(notify.Get());
             info.EnqueueVersion(notify.Get());
             Send(subscriber, std::move(notify));
             Send(subscriber, std::move(notify));
         }
         }
+
+        if (subscribers) {
+            Send(SelfId(), new TEvPrivate::TEvSendStrongNotifications(owner));
+        } else {
+            WaitStrongNotifications.erase(itSubscribers);
+        }
     }
     }
 
 
     void Handle(TSchemeBoardEvents::TEvSubscribe::TPtr& ev) {
     void Handle(TSchemeBoardEvents::TEvSubscribe::TPtr& ev) {
@@ -1313,6 +1377,7 @@ public:
             hFunc(TSchemeBoardEvents::TEvHandshakeRequest, Handle);
             hFunc(TSchemeBoardEvents::TEvHandshakeRequest, Handle);
             hFunc(TSchemeBoardEvents::TEvUpdate, Handle);
             hFunc(TSchemeBoardEvents::TEvUpdate, Handle);
             hFunc(TSchemeBoardEvents::TEvCommitRequest, Handle);
             hFunc(TSchemeBoardEvents::TEvCommitRequest, Handle);
+            hFunc(TEvPrivate::TEvSendStrongNotifications, Handle);
             hFunc(TSchemeBoardEvents::TEvSubscribe, Handle);
             hFunc(TSchemeBoardEvents::TEvSubscribe, Handle);
             hFunc(TSchemeBoardEvents::TEvUnsubscribe, Handle);
             hFunc(TSchemeBoardEvents::TEvUnsubscribe, Handle);
             hFunc(TSchemeBoardEvents::TEvNotifyAck, Handle);
             hFunc(TSchemeBoardEvents::TEvNotifyAck, Handle);
@@ -1330,6 +1395,7 @@ private:
     THashMap<ui64, TPopulatorInfo> Populators;
     THashMap<ui64, TPopulatorInfo> Populators;
     TDoubleIndexedMap<TString, TPathId, TDescription, TMerger, THashMap, TMap> Descriptions;
     TDoubleIndexedMap<TString, TPathId, TDescription, TMerger, THashMap, TMap> Descriptions;
     TMap<TActorId, std::variant<TString, TPathId>, TActorId::TOrderedCmp> Subscribers;
     TMap<TActorId, std::variant<TString, TPathId>, TActorId::TOrderedCmp> Subscribers;
+    THashMap<ui64, TSet<TActorId>> WaitStrongNotifications;
 
 
 }; // TReplica
 }; // TReplica