Browse Source

Improve BlobDepot agent queueing logic

alexvru 2 years ago
parent
commit
a2e4ed1195

+ 5 - 5
ydb/core/blob_depot/agent/agent_impl.h

@@ -86,6 +86,7 @@ namespace NKikimr::NBlobDepot {
         struct TEvPrivate {
             enum {
                 EvQueryWatchdog = EventSpaceBegin(TEvents::ES_PRIVATE),
+                EvProcessPendingEvent,
             };
         };
 
@@ -122,6 +123,7 @@ namespace NKikimr::NBlobDepot {
 
             ENUMERATE_INCOMING_EVENTS(FORWARD_STORAGE_PROXY)
             hFunc(TEvBlobStorage::TEvBunchOfEvents, Handle);
+            cFunc(TEvPrivate::EvProcessPendingEvent, HandlePendingEvent);
 
             cFunc(TEvPrivate::EvQueryWatchdog, HandleQueryWatchdog);
         );
@@ -140,10 +142,7 @@ namespace NKikimr::NBlobDepot {
                     if (TabletId && TabletId != Max<ui64>()) {
                         ConnectToBlobDepot();
                     }
-                
-                    for (auto& ev : std::exchange(PendingEventQ, {})) {
-                        TActivationContext::Send(ev.release());
-                    }
+                    HandlePendingEvent();
                 }
                 if (!info->GetTotalVDisksNum()) {
                     TActivationContext::Send(new IEventHandle(TEvents::TSystem::Poison, 0, ProxyId, {}, nullptr, 0));
@@ -275,8 +274,9 @@ namespace NKikimr::NBlobDepot {
 
         void HandleQueryWatchdog();
         void HandleStorageProxy(TAutoPtr<IEventHandle> ev);
+        void HandlePendingEvent();
+        void ProcessStorageEvent(std::unique_ptr<IEventHandle> ev);
         void Handle(TEvBlobStorage::TEvBunchOfEvents::TPtr ev);
-        TQuery *CreateQuery(TAutoPtr<IEventHandle> ev);
         template<ui32 EventType> TQuery *CreateQuery(std::unique_ptr<IEventHandle> ev);
 
         ////////////////////////////////////////////////////////////////////////////////////////////////////////////////

+ 34 - 13
ydb/core/blob_depot/agent/query.cpp

@@ -3,34 +3,55 @@
 namespace NKikimr::NBlobDepot {
 
     void TBlobDepotAgent::HandleStorageProxy(TAutoPtr<IEventHandle> ev) {
-        if (TabletId == Max<ui64>()) {
+        if (TabletId == Max<ui64>() || !PendingEventQ.empty()) {
             // TODO: memory usage control
             PendingEventQ.emplace_back(ev.Release());
         } else {
-            auto *query = CreateQuery(ev);
-            STLOG(PRI_DEBUG, BLOB_DEPOT_AGENT, BDA13, "new query", (VirtualGroupId, VirtualGroupId),
-                (QueryId, query->GetQueryId()), (TabletId, query->GetTabletId()), (Name, query->GetName()));
-            if (!TabletId) {
-                query->EndWithError(NKikimrProto::ERROR, "group is in error state");
+            ProcessStorageEvent(std::unique_ptr<IEventHandle>(ev.Release()));
+        }
+    }
+
+    void TBlobDepotAgent::HandlePendingEvent() {
+        THPTimer timer;
+
+        do {
+            if (!PendingEventQ.empty()) {
+                ProcessStorageEvent(std::move(PendingEventQ.front()));
+                PendingEventQ.pop_front();
             } else {
-                query->Initiate();
+                break;
             }
+        } while (TDuration::Seconds(timer.Passed()) <= TDuration::MicroSeconds(100));
+
+        if (!PendingEventQ.empty()) {
+            TActivationContext::Send(new IEventHandle(TEvPrivate::EvProcessPendingEvent, 0, SelfId(), {}, nullptr, 0));
         }
     }
 
-    void TBlobDepotAgent::Handle(TEvBlobStorage::TEvBunchOfEvents::TPtr ev) {
-        ev->Get()->Process(this);
-    }
+    void TBlobDepotAgent::ProcessStorageEvent(std::unique_ptr<IEventHandle> ev) {
+        TQuery *query = nullptr;
 
-    TBlobDepotAgent::TQuery *TBlobDepotAgent::CreateQuery(TAutoPtr<IEventHandle> ev) {
         switch (ev->GetTypeRewrite()) {
 #define XX(TYPE) \
-            case TEvBlobStorage::TYPE: return CreateQuery<TEvBlobStorage::TYPE>(std::unique_ptr<IEventHandle>(ev.Release()));
+            case TEvBlobStorage::TYPE: query = CreateQuery<TEvBlobStorage::TYPE>(std::move(ev)); break;
 
             ENUMERATE_INCOMING_EVENTS(XX)
 #undef XX
         }
-        Y_FAIL();
+
+        Y_VERIFY(query);
+
+        STLOG(PRI_DEBUG, BLOB_DEPOT_AGENT, BDA13, "new query", (VirtualGroupId, VirtualGroupId),
+            (QueryId, query->GetQueryId()), (TabletId, query->GetTabletId()), (Name, query->GetName()));
+        if (!TabletId) {
+            query->EndWithError(NKikimrProto::ERROR, "group is in error state");
+        } else {
+            query->Initiate();
+        }
+    }
+
+    void TBlobDepotAgent::Handle(TEvBlobStorage::TEvBunchOfEvents::TPtr ev) {
+        ev->Get()->Process(this);
     }
 
     void TBlobDepotAgent::HandleQueryWatchdog() {

+ 3 - 0
ydb/core/blob_depot/agent/storage_get.cpp

@@ -57,6 +57,9 @@ namespace NKikimr::NBlobDepot {
                         if (!ProcessSingleResult(i, value)) {
                             return;
                         }
+                    } else {
+                        STLOG(PRI_DEBUG, BLOB_DEPOT_AGENT, BDA29, "resolve pending", (VirtualGroupId, Agent.VirtualGroupId),
+                            (QueryId, GetQueryId()), (QueryIdx, i), (BlobId, query.Id));
                     }
                 }
             }