Browse Source

Refactor SkeletonFront message tracking (#1953)

Alexander Rutkovsky 1 year ago
parent
commit
450bec937d

+ 4 - 0
ydb/core/blobstorage/vdisk/common/vdisk_events.h

@@ -332,6 +332,7 @@ namespace NKikimr {
         const NKikimrBlobStorage::EVDiskQueueId ExtQueueId = NKikimrBlobStorage::EVDiskQueueId::Unknown;
         const NKikimrBlobStorage::EVDiskInternalQueueId IntQueueId = NKikimrBlobStorage::EVDiskInternalQueueId::IntUnknown;
         const TActorId ActorId;
+        const ui64 InternalMessageId = 0;
 
         TVMsgContext() = default;
 
@@ -343,6 +344,7 @@ namespace NKikimr {
             , ExtQueueId(msgQoS.GetExtQueueId())
             , IntQueueId(msgQoS.GetIntQueueId())
             , ActorId(ActorIdFromProto(msgQoS.GetSenderActorId()))
+            , InternalMessageId(msgQoS.GetInternalMessageId())
         {}
 
         void Output(IOutputStream &str) const {
@@ -352,6 +354,7 @@ namespace NKikimr {
                 << " Cost# " << Cost
                 << " ExtQueueId# " << ExtQueueId
                 << " IntQueueId# " << IntQueueId
+                << " InternalMessageId# " << InternalMessageId
                 << "}";
         }
 
@@ -498,6 +501,7 @@ namespace NKikimr {
                 resultQoS->Swap(queryRecord->MutableMsgQoS());
                 resultQoS->ClearDeadlineSeconds();
                 resultQoS->ClearSendMeCostSettings();
+                resultQoS->ClearInternalMessageId();
             } else {
                 Y_ABORT_UNLESS(!SkeletonFrontIDPtr);
             }

+ 29 - 69
ydb/core/blobstorage/vdisk/skeleton/blobstorage_skeletonfront.cpp

@@ -104,14 +104,14 @@ namespace NKikimr {
             TActorId ActorId;
             NWilson::TSpan Span;
             std::shared_ptr<TVDiskSkeletonTrace> Trace;
-            ui64 Cookie;
+            ui64 InternalMessageId;
 
             TRecord() = default;
 
             TRecord(std::unique_ptr<IEventHandle> ev, TInstant now, ui32 recByteSize, const NBackpressure::TMessageId &msgId,
                     ui64 cost, TInstant deadline, NKikimrBlobStorage::EVDiskQueueId extQueueId,
                     const NBackpressure::TQueueClientId& clientId, TString name, std::shared_ptr<TVDiskSkeletonTrace> &&trace,
-                    ui64 cookie)
+                    ui64 internalMessageId)
                 : Ev(std::move(ev))
                 , ReceivedTime(now)
                 , Deadline(deadline)
@@ -123,7 +123,7 @@ namespace NKikimr {
                 , ActorId(Ev->Sender)
                 , Span(TWilson::VDiskTopLevel, std::move(Ev->TraceId), "VDisk.SkeletonFront.Queue")
                 , Trace(std::move(trace))
-                , Cookie(cookie)
+                , InternalMessageId(internalMessageId)
             {
                 Span.Attribute("QueueName", std::move(name));
                 Ev->TraceId = Span.GetTraceId();
@@ -144,7 +144,6 @@ namespace NKikimr {
         ////////////////////////////////////////////////////////////////////////////
         class TIntQueueClass {
             using TQueueType = TQueueInplace<TRecord, 4096>;
-            using TFreeTraceObjectsQueue = TQueueInplace<std::shared_ptr<TVDiskSkeletonTrace>, 4096>;
 
             struct TMsgInfo {
                 ui64 MsgId;
@@ -169,7 +168,6 @@ namespace NKikimr {
             const ui64 MaxInFlightCount;
             const ui64 MaxInFlightCost;
             THashMap<ui64, TMsgInfo> Msgs;
-            std::unique_ptr<TFreeTraceObjectsQueue, TFreeTraceObjectsQueue::TCleanDestructor> FreeTraceObjects;
         public:
             const NKikimrBlobStorage::EVDiskInternalQueueId IntQueueId;
             const TString Name;
@@ -205,7 +203,6 @@ namespace NKikimr {
                 , Deadlines(0)
                 , MaxInFlightCount(maxInFlightCount)
                 , MaxInFlightCost(maxInFlightCost)
-                , FreeTraceObjects(new TFreeTraceObjectsQueue())
                 , IntQueueId(intQueueId)
                 , Name(name)
                 , SkeletonFrontInFlightCount(skeletonFrontGroup->GetCounter("SkeletonFront/" + Name + "/InFlightCount", false))
@@ -220,24 +217,12 @@ namespace NKikimr {
                 return Queue->GetSize();
             }
 
-            std::shared_ptr<TVDiskSkeletonTrace> GetFreeTrace() {
-                if (std::shared_ptr<TVDiskSkeletonTrace> *ptr = FreeTraceObjects->Head()) {
-                    std::shared_ptr<TVDiskSkeletonTrace> tmp = std::move(*ptr);
-                    Y_DEBUG_ABORT_UNLESS(tmp.use_count() == 1);
-                    FreeTraceObjects->Pop();
-                    return tmp;
-                } else {
-                    return std::make_shared<TVDiskSkeletonTrace>();
-                }
-            }
-
             template<typename TFront>
             void Enqueue(const TActorContext &ctx, ui32 recByteSize, std::unique_ptr<IEventHandle> converted,
                          const NBackpressure::TMessageId &msgId, ui64 cost, const TInstant &deadline,
-                         NKikimrBlobStorage::EVDiskQueueId extQueueId, TFront& front,
-                         const NBackpressure::TQueueClientId& clientId, std::shared_ptr<TVDiskSkeletonTrace> &&trace) {
-                Y_UNUSED(front);
-                ui64 cookie = converted->Cookie;
+                         NKikimrBlobStorage::EVDiskQueueId extQueueId, TFront& /*front*/,
+                         const NBackpressure::TQueueClientId& clientId, std::shared_ptr<TVDiskSkeletonTrace> &&trace,
+                         ui64 internalMessageId) {
                 if (!Queue->Head() && CanSendToSkeleton(cost)) {
                     // send to Skeleton for further processing
                     ctx.ExecutorThread.Send(converted.release());
@@ -249,7 +234,7 @@ namespace NKikimr {
                     *SkeletonFrontInFlightCost += cost;
                     *SkeletonFrontInFlightBytes += recByteSize;
 
-                    Msgs.emplace(cookie, TMsgInfo(msgId.MsgId, ctx.Now(), std::move(trace)));
+                    Msgs.emplace(internalMessageId, TMsgInfo(msgId.MsgId, ctx.Now(), std::move(trace)));
                 } else {
                     // enqueue
                     ++DelayedCount;
@@ -260,7 +245,7 @@ namespace NKikimr {
 
                     TInstant now = TAppData::TimeProvider->Now();
                     Queue->Push(TRecord(std::move(converted), now, recByteSize, msgId, cost, deadline, extQueueId,
-                        clientId, Name, std::move(trace), cookie));
+                        clientId, Name, std::move(trace), internalMessageId));
                 }
             }
 
@@ -308,7 +293,7 @@ namespace NKikimr {
                             *SkeletonFrontInFlightCost += cost;
                             *SkeletonFrontInFlightBytes += recByteSize;
 
-                            Msgs.emplace(rec->Cookie, TMsgInfo(rec->MsgId.MsgId, ctx.Now(), std::move(rec->Trace)));
+                            Msgs.emplace(rec->InternalMessageId, TMsgInfo(rec->MsgId.MsgId, ctx.Now(), std::move(rec->Trace)));
                         }
                         Queue->Pop();
                     } else {
@@ -319,7 +304,7 @@ namespace NKikimr {
 
         public:
             template <class TFront>
-            void Completed(const TActorContext &ctx, const TVMsgContext &msgCtx, TFront &front, ui64 cookie) {
+            void Completed(const TActorContext &ctx, const TVMsgContext &msgCtx, TFront &front) {
                 Y_ABORT_UNLESS(InFlightCount >= 1 && InFlightBytes >= msgCtx.RecByteSize && InFlightCost >= msgCtx.Cost,
                          "IntQueueId# %s InFlightCount# %" PRIu64 " InFlightBytes# %" PRIu64
                          " InFlightCost# %" PRIu64 " msgCtx# %s Deadlines# %" PRIu64,
@@ -335,16 +320,8 @@ namespace NKikimr {
                 *SkeletonFrontInFlightBytes -= msgCtx.RecByteSize;
                 *SkeletonFrontCostProcessed += msgCtx.Cost;
 
-                // TODO(kruall): fix it, cookie always must be found
-                auto it = Msgs.find(cookie);
-                if (it != Msgs.end()) {
-                    Y_VERIFY_S(it!= Msgs.end(), "cookie# " << cookie);
-                    if (auto trace = std::exchange(it->second.VDiskSkeletonTrace, nullptr); trace && trace.use_count() == 1) {
-                        trace->Clear();
-                        FreeTraceObjects->Push(std::move(trace));
-                    }
-                    Msgs.erase(it);
-                }
+                const size_t numErased = Msgs.erase(msgCtx.InternalMessageId);
+                Y_ABORT_UNLESS(numErased == 1);
 
                 ProcessNext(ctx, front, false);
             }
@@ -352,8 +329,7 @@ namespace NKikimr {
             bool Sanitize(const TActorContext &ctx, const TString &vDiskLogPrefix) {
                 bool hasError = false;
                 TInstant now = ctx.Now();
-                for (auto &pair : Msgs) {
-                    const TMsgInfo &msgInfo = pair.second;
+                for (const auto& [internalMessageId, msgInfo] : Msgs) {
                     TDuration passedTime = now - msgInfo.ReceivedTime;
                     if (passedTime > TDuration::Minutes(5)) {
                         hasError = true;
@@ -362,7 +338,7 @@ namespace NKikimr {
                                 (MsgId, msgInfo.MsgId),,
                                 (QueueName, Name),
                                 (PassedTimeSeconds, passedTime.Seconds()),
-                                (Trace, (msgInfo.VDiskSkeletonTrace && msgInfo.VDiskSkeletonTrace->ToString() ? msgInfo.VDiskSkeletonTrace->ToString() : "None")));
+                                (Trace, (msgInfo.VDiskSkeletonTrace ? msgInfo.VDiskSkeletonTrace->ToString() : "None")));
                     }
                 }
                 return hasError;
@@ -475,9 +451,6 @@ namespace NKikimr {
             ::NMonitoring::TDynamicCounters::TCounterPtr SkeletonFrontOverflow;
             ::NMonitoring::TDynamicCounters::TCounterPtr SkeletonFrontIncorrectMsgId;
 
-            ui64 NextInternalId = 0;
-            THashMap<ui64, ui64> InternalIdToCookie;
-
             void NotifyOtherClients(const TActorContext &ctx, const TFeedback &feedback) {
                 for (const auto &x : feedback.second) {
                     SendWindowChange(ctx, x, ExtQueueId);
@@ -549,11 +522,6 @@ namespace NKikimr {
                     }
                     front.ReplyFunc(std::exchange(converted, nullptr), ctx, status, errorReason, now, feedback.first);
                 }
-
-                if (converted) {
-                    ui64 id = ++NextInternalId;
-                    InternalIdToCookie[id] = std::exchange(const_cast<ui64&>(converted->Cookie), id);
-                }
                 return converted;
             }
 
@@ -561,7 +529,6 @@ namespace NKikimr {
             void DeadlineHappened(const TActorContext &ctx, TRecord *rec, TInstant now, TFront &front) {
                 ++*SkeletonFrontDeadline;
                 auto feedback = QueueBackpressure->Processed(rec->ActorId, rec->MsgId, rec->Cost, now);
-                ReturnCookie(rec->Ev, false);
                 front.ReplyFunc(std::move(rec->Ev), ctx, NKikimrProto::DEADLINE, "deadline exceeded", now, feedback.first);
                 NotifyOtherClients(ctx, feedback);
             }
@@ -569,7 +536,6 @@ namespace NKikimr {
             template <class TFront>
             void DroppedWithError(const TActorContext &ctx, TRecord *rec, TInstant now, TFront &front) {
                 auto feedback = QueueBackpressure->Processed(rec->ActorId, rec->MsgId, rec->Cost, now);
-                ReturnCookie(rec->Ev, false);
                 front.ReplyFunc(std::move(rec->Ev), ctx, NKikimrProto::ERROR, "error state", now, feedback.first);
             }
 
@@ -585,18 +551,7 @@ namespace NKikimr {
                 QueueBackpressure->ForEachWindow(callback);
             }
 
-            void ReturnCookie(std::unique_ptr<IEventHandle> &evHandle, bool required) {
-                if (auto it = InternalIdToCookie.find(evHandle->Cookie); it != InternalIdToCookie.end()) {
-                    const_cast<ui64&>(evHandle->Cookie) = it->second;
-                    InternalIdToCookie.erase(it);
-                } else {
-                    Y_ABORT_UNLESS(!required, "Internal error, it can't find internal id");
-                }
-            }
-
             void Completed(const TActorContext &ctx, const TVMsgContext &msgCtx, std::unique_ptr<IEventHandle> &evHandle) {
-                ReturnCookie(evHandle, true);
-
                 TInstant now = TAppData::TimeProvider->Now();
                 Y_ABORT_UNLESS(msgCtx.ActorId);
                 auto feedback = QueueBackpressure->Processed(msgCtx.ActorId, msgCtx.MsgId, msgCtx.Cost, now);
@@ -708,6 +663,11 @@ namespace NKikimr {
         bool HasUnreadableBlobs = false;
         TInstant LastSanitizeTime = TInstant::Zero();
         TInstant LastSanitizeWithErrorTime = TInstant::Zero();
+        ui64 NextUniqueMessageId = 1;
+
+        ui64 AllocateMessageId() {
+            return NextUniqueMessageId++;
+        }
 
         ////////////////////////////////////////////////////////////////////////
         // NOTIFICATIONS
@@ -1229,8 +1189,8 @@ namespace NKikimr {
                 || std::is_same_v<TEv, TEvBlobStorage::TEvVPatchDiff>
                 || std::is_same_v<TEv, TEvBlobStorage::TEvVPatchXorDiff>;
 
-        template <class TEventPtr>
-        void HandleRequestWithQoS(const TActorContext &ctx, TEventPtr &ev, const char *msgName, ui64 cost,
+        template <class TEvent>
+        void HandleRequestWithQoS(const TActorContext &ctx, TAutoPtr<TEventHandle<TEvent>> &ev, const char *msgName, ui64 cost,
                                   TIntQueueClass &intQueue) {
             CheckEvent(ev, msgName);
             const ui64 advancedCost = VCtx->CostTracker->GetCost(*ev->Get());
@@ -1248,6 +1208,8 @@ namespace NKikimr {
             msgQoS.SetCost(cost);
             msgQoS.SetIntQueueId(intQueueId);
             ActorIdToProto(ev->Sender, msgQoS.MutableSenderActorId());
+            const ui64 internalMessageId = AllocateMessageId();
+            msgQoS.SetInternalMessageId(internalMessageId);
             FillInCostSettingsAndTimestampIfRequired(&msgQoS, now);
 
             // check queue compatibility: it's a contract between BlobStorage Proxy and VDisk,
@@ -1268,15 +1230,14 @@ namespace NKikimr {
                 ev->Forward(SkeletonId).Release()), msgId, cost, *this, clientId);
             if (event) {
                 std::shared_ptr<TVDiskSkeletonTrace> trace;
-                if constexpr (IsPatchEvent<std::decay_t<decltype(*ev->Get())>>) {
-                    trace = intQueue.GetFreeTrace();
-                    event->Get<std::decay_t<decltype(*ev->Get())>>()->VDiskSkeletonTrace = trace;
+                if constexpr (IsPatchEvent<TEvent>) {
+                    event->Get<TEvent>()->VDiskSkeletonTrace = std::make_shared<TVDiskSkeletonTrace>();
                 }
                 // good, enqueue it in intQueue
-                intQueue.Enqueue(ctx, recByteSize, std::move(event), msgId, cost,
-                        deadline, extQueueId, *this, clientId, std::move(trace));
+                intQueue.Enqueue(ctx, recByteSize, std::move(event), msgId, cost, deadline, extQueueId, *this, clientId,
+                    std::move(trace), internalMessageId);
 
-                if constexpr (std::is_same_v<std::decay_t<decltype(*ev->Get())>, TEvBlobStorage::TEvVPatchXorDiff>) {
+                if constexpr (std::is_same_v<TEvent, TEvBlobStorage::TEvVPatchXorDiff>) {
                     // TEvVPatchXorDiff's cost is included in cost of other Patch operations
                 } else {
                     if (clientId.GetType() == NBackpressure::EQueueClientType::DSProxy) {
@@ -1627,11 +1588,10 @@ namespace NKikimr {
         void Handle(TEvVDiskRequestCompleted::TPtr &ev, const TActorContext &ctx) {
             const TVMsgContext &msgCtx = ev->Get()->Ctx;
             std::unique_ptr<IEventHandle> event = std::move(ev->Get()->Event);
-            ui64 id = event->Cookie;
             TExtQueueClass &extQueue = GetExtQueue(msgCtx.ExtQueueId);
             extQueue.Completed(ctx, msgCtx, event);
             TIntQueueClass &intQueue = GetIntQueue(msgCtx.IntQueueId);
-            intQueue.Completed(ctx, msgCtx, *this, id);
+            intQueue.Completed(ctx, msgCtx, *this);
             VCtx->CostTracker->CountPDiskResponse();
             if (!ev->Get()->DoNotResend) {
                 TActivationContext::Send(event.release());

+ 1 - 0
ydb/core/protos/blobstorage.proto

@@ -77,6 +77,7 @@ message TMsgQoS {
     }
     optional TExecTimeStats ExecTimeStats = 12;
     optional NActorsProto.TActorId SenderActorId = 15;
+    optional uint64 InternalMessageId = 16; // for in-process use
 }
 
 // message VDisk sends to the client, when backpressure window changes