Browse Source

Handle long-lasting Put queries in DS proxy correctly -- part 1 KIKIMR-9016

alexvru 1 year ago
parent
commit
e8437d81a0

+ 88 - 83
ydb/core/blobstorage/dsproxy/dsproxy_blackboard.cpp

@@ -92,7 +92,7 @@ void TBlobState::AddResponseData(const TBlobStorageGroupInfo &info, const TLogoB
         ui32 shift, TRope&& data, bool keep, bool doNotKeep) {
     // Add actual data to Parts
     Y_ABORT_UNLESS(id.PartId() != 0);
-    ui32 partIdx = id.PartId() - 1;
+    const ui32 partIdx = id.PartId() - 1;
     Y_ABORT_UNLESS(partIdx < Parts.size());
     const ui32 partSize = info.Type.PartSize(id);
     const ui32 dataSize = data.size();
@@ -100,114 +100,89 @@ void TBlobState::AddResponseData(const TBlobStorageGroupInfo &info, const TLogoB
         Parts[partIdx].AddResponseData(partSize, shift, std::move(data));
     }
     IsChanged = true;
+
+    const ui32 diskIdx = info.GetIdxInSubgroup(info.GetVDiskId(orderNumber), id.Hash());
+    Y_ABORT_UNLESS(diskIdx != info.Type.BlobSubgroupSize());
+    TDisk& disk = Disks[diskIdx];
+    Y_ABORT_UNLESS(disk.OrderNumber == orderNumber);
+
     // Mark part as present for the disk
-    bool isFound = false;
-    for (ui32 diskIdx = 0; diskIdx < Disks.size(); ++diskIdx) {
-        TDisk &disk = Disks[diskIdx];
-        if (disk.OrderNumber == orderNumber) {
-            isFound = true;
-            Y_ABORT_UNLESS(partIdx < disk.DiskParts.size());
-            TDiskPart &diskPart = disk.DiskParts[partIdx];
-            //Cerr << Endl << "present diskIdx# " << diskIdx << " partIdx# " << partIdx << Endl << Endl;
-            diskPart.Situation = ESituation::Present;
-            if (partSize) {
-                TIntervalVec<i32> responseInterval(shift, shift + dataSize);
-                diskPart.Requested.Subtract(responseInterval);
-            }
-            break;
-        }
+    Y_ABORT_UNLESS(partIdx < disk.DiskParts.size());
+    TDiskPart &diskPart = disk.DiskParts[partIdx];
+    diskPart.Situation = ESituation::Present;
+    if (partSize) {
+        TIntervalVec<i32> responseInterval(shift, shift + dataSize);
+        diskPart.Requested.Subtract(responseInterval);
     }
-    Y_ABORT_UNLESS(isFound);
+
     Keep |= keep;
     DoNotKeep |= doNotKeep;
 }
 
 void TBlobState::AddNoDataResponse(const TBlobStorageGroupInfo &info, const TLogoBlobID &id, ui32 orderNumber) {
-    Y_UNUSED(info);
     Y_ABORT_UNLESS(id.PartId() != 0);
-    ui32 partIdx = id.PartId() - 1;
+    const ui32 partIdx = id.PartId() - 1;
     IsChanged = true;
-    // Mark part as absent for the disk
-    bool isFound = false;
-    for (ui32 diskIdx = 0; diskIdx < Disks.size(); ++diskIdx) {
-        TDisk &disk = Disks[diskIdx];
-        if (disk.OrderNumber == orderNumber) {
-            isFound = true;
-            Y_ABORT_UNLESS(partIdx < disk.DiskParts.size());
-            TDiskPart &diskPart = disk.DiskParts[partIdx];
-            //Cerr << Endl << "absent diskIdx# " << diskIdx << " partIdx# " << partIdx << Endl << Endl;
-            diskPart.Situation = ESituation::Absent;
-            diskPart.Requested.Clear();
-            break;
-        }
-    }
-    Y_ABORT_UNLESS(isFound);
+
+    const ui32 diskIdx = info.GetIdxInSubgroup(info.GetVDiskId(orderNumber), id.Hash());
+    Y_ABORT_UNLESS(diskIdx != info.Type.BlobSubgroupSize());
+    TDisk& disk = Disks[diskIdx];
+    Y_ABORT_UNLESS(disk.OrderNumber == orderNumber);
+
+    Y_ABORT_UNLESS(partIdx < disk.DiskParts.size());
+    TDiskPart &diskPart = disk.DiskParts[partIdx];
+    diskPart.Situation = ESituation::Absent;
+    diskPart.Requested.Clear();
 }
 
 void TBlobState::AddPutOkResponse(const TBlobStorageGroupInfo &info, const TLogoBlobID &id, ui32 orderNumber) {
-    Y_UNUSED(info);
     Y_ABORT_UNLESS(id.PartId() != 0);
-    ui32 partIdx = id.PartId() - 1;
+    const ui32 partIdx = id.PartId() - 1;
     IsChanged = true;
-    // Mark part as put ok for the disk
-    bool isFound = false;
-    for (ui32 diskIdx = 0; diskIdx < Disks.size(); ++diskIdx) {
-        TDisk &disk = Disks[diskIdx];
-        if (disk.OrderNumber == orderNumber) {
-            isFound = true;
-            Y_ABORT_UNLESS(partIdx < disk.DiskParts.size());
-            TDiskPart &diskPart = disk.DiskParts[partIdx];
-            //Cerr << Endl << "put ok diskIdx# " << diskIdx << " partIdx# " << partIdx << Endl << Endl;
-            diskPart.Situation = ESituation::Present;
-            break;
-        }
-    }
-    Y_ABORT_UNLESS(isFound);
+
+    const ui32 diskIdx = info.GetIdxInSubgroup(info.GetVDiskId(orderNumber), id.Hash());
+    Y_ABORT_UNLESS(diskIdx != info.Type.BlobSubgroupSize());
+    TDisk& disk = Disks[diskIdx];
+    Y_ABORT_UNLESS(disk.OrderNumber == orderNumber);
+
+    Y_ABORT_UNLESS(partIdx < disk.DiskParts.size());
+    TDiskPart& diskPart = disk.DiskParts[partIdx];
+    diskPart.Situation = ESituation::Present;
 }
 
 void TBlobState::AddErrorResponse(const TBlobStorageGroupInfo &info, const TLogoBlobID &id, ui32 orderNumber) {
-    Y_UNUSED(info);
     Y_ABORT_UNLESS(id.PartId() != 0);
     ui32 partIdx = id.PartId() - 1;
     IsChanged = true;
-    // Mark part as error for the disk
-    bool isFound = false;
-    for (ui32 diskIdx = 0; diskIdx < Disks.size(); ++diskIdx) {
-        TDisk &disk = Disks[diskIdx];
-        if (disk.OrderNumber == orderNumber) {
-            isFound = true;
-            Y_ABORT_UNLESS(partIdx < disk.DiskParts.size());
-            TDiskPart &diskPart = disk.DiskParts[partIdx];
-            //Cerr << Endl << "error diskIdx# " << diskIdx << " partIdx# " << partIdx << Endl << Endl;
-            diskPart.Situation = ESituation::Error;
-            diskPart.Requested.Clear();
-            break;
-        }
-    }
-    Y_ABORT_UNLESS(isFound);
+
+    const ui32 diskIdx = info.GetIdxInSubgroup(info.GetVDiskId(orderNumber), id.Hash());
+    Y_ABORT_UNLESS(diskIdx != info.Type.BlobSubgroupSize());
+    TDisk& disk = Disks[diskIdx];
+    Y_ABORT_UNLESS(disk.OrderNumber == orderNumber);
+
+    Y_ABORT_UNLESS(partIdx < disk.DiskParts.size());
+    TDiskPart &diskPart = disk.DiskParts[partIdx];
+    diskPart.Situation = ESituation::Error;
+    diskPart.Requested.Clear();
 }
 
 void TBlobState::AddNotYetResponse(const TBlobStorageGroupInfo &info, const TLogoBlobID &id, ui32 orderNumber,
         bool keep, bool doNotKeep) {
-    Y_UNUSED(info);
     Y_ABORT_UNLESS(id.PartId() != 0);
-    ui32 partIdx = id.PartId() - 1;
+    const ui32 partIdx = id.PartId() - 1;
     IsChanged = true;
-    // Mark part as error for the disk
-    bool isFound = false;
-    for (ui32 diskIdx = 0; diskIdx < Disks.size(); ++diskIdx) {
-        TDisk &disk = Disks[diskIdx];
-        if (disk.OrderNumber == orderNumber) {
-            isFound = true;
-            Y_ABORT_UNLESS(partIdx < disk.DiskParts.size());
-            TDiskPart &diskPart = disk.DiskParts[partIdx];
-            //Cerr << Endl << "error diskIdx# " << diskIdx << " partIdx# " << partIdx << Endl << Endl;
-            diskPart.Situation = ESituation::Lost;
-            diskPart.Requested.Clear();
-            break;
-        }
-    }
-    Y_ABORT_UNLESS(isFound);
+
+    const ui32 diskIdx = info.GetIdxInSubgroup(info.GetVDiskId(orderNumber), id.Hash());
+    Y_ABORT_UNLESS(diskIdx != info.Type.BlobSubgroupSize());
+    TDisk& disk = Disks[diskIdx];
+    Y_ABORT_UNLESS(disk.OrderNumber == orderNumber);
+
+    Y_ABORT_UNLESS(partIdx < disk.DiskParts.size());
+    TDiskPart &diskPart = disk.DiskParts[partIdx];
+    //Cerr << Endl << "error diskIdx# " << diskIdx << " partIdx# " << partIdx << Endl << Endl;
+    diskPart.Situation = ESituation::Lost;
+    diskPart.Requested.Clear();
+
     Keep |= keep;
     DoNotKeep |= doNotKeep;
 }
@@ -236,6 +211,20 @@ void TBlobState::GetWorstPredictedDelaysNs(const TBlobStorageGroupInfo &info, TG
     }
 }
 
+bool TBlobState::HasWrittenQuorum(const TBlobStorageGroupInfo& info, const TBlobStorageGroupInfo::TGroupVDisks& expired) const {
+    TSubgroupPartLayout layout;
+    for (ui32 diskIdx = 0, numDisks = Disks.size(); diskIdx < numDisks; ++diskIdx) {
+        const TDisk& disk = Disks[diskIdx];
+        for (ui32 partIdx = 0, numParts = disk.DiskParts.size(); partIdx < numParts; ++partIdx) {
+            const TDiskPart& part = disk.DiskParts[partIdx];
+            if (part.Situation == ESituation::Present && !expired[disk.OrderNumber]) {
+                layout.AddItem(diskIdx, partIdx, info.Type);
+            }
+        }
+    }
+    return info.GetQuorumChecker().GetBlobState(layout, {&info.GetTopology()}) == TBlobStorageGroupInfo::EBS_FULL;
+}
+
 TString TBlobState::ToString() const {
     TStringStream str;
     str << "{Id# " << Id.ToString();
@@ -615,5 +604,21 @@ TString TBlackboard::ToString() const {
     return str.Str();
 }
 
+void TBlackboard::InvalidatePartStates(ui32 orderNumber) {
+    const TVDiskID vdiskId = Info->GetVDiskId(orderNumber);
+    for (auto& [id, state] : BlobStates) {
+        Y_ABORT_UNLESS(!state.IsDone);
+        if (const ui32 diskIdx = Info->GetIdxInSubgroup(vdiskId, id.Hash()); diskIdx != Info->Type.BlobSubgroupSize()) {
+            TBlobState::TDisk& disk = state.Disks[diskIdx];
+            for (ui32 partIdx = 0; partIdx < disk.DiskParts.size(); ++partIdx) {
+                TBlobState::TDiskPart& part = disk.DiskParts[partIdx];
+                if (part.Situation == TBlobState::ESituation::Sent || part.Situation == TBlobState::ESituation::Present) {
+                    part.Situation = TBlobState::ESituation::Unknown;
+                    state.IsChanged = true;
+                }
+            }
+        }
+    }
+}
 
 }//NKikimr

+ 7 - 0
ydb/core/blobstorage/dsproxy/dsproxy_blackboard.h

@@ -5,6 +5,7 @@
 
 #include <ydb/core/blobstorage/base/batched_vec.h>
 #include <ydb/core/blobstorage/groupinfo/blobstorage_groupinfo.h>
+#include <ydb/core/blobstorage/groupinfo/blobstorage_groupinfo_partlayout.h>
 #include <ydb/core/util/fragmented_buffer.h>
 #include <ydb/core/util/interval_set.h>
 #include <library/cpp/containers/stack_vector/stack_vec.h>
@@ -63,12 +64,14 @@ struct TBlobState {
     struct TDiskPart {
         TIntervalSet<i32> Requested;
         ESituation Situation = ESituation::Unknown;
+
         TString ToString() const;
     };
     struct TDisk {
         ui32 OrderNumber;
         bool IsSlow = false;
         TStackVec<TDiskPart, TypicalPartsInBlob> DiskParts;
+
         TString ToString() const;
     };
 
@@ -105,6 +108,8 @@ struct TBlobState {
             NKikimrBlobStorage::EVDiskQueueId queueId,
             ui64 *outWorstNs, ui64 *outNextToWorstNs, i32 *outWorstSubgroupIdx) const;
     TString ToString() const;
+    bool HasWrittenQuorum(const TBlobStorageGroupInfo& info, const TBlobStorageGroupInfo::TGroupVDisks& expired) const;
+            
     static TString SituationToString(ESituation situation);
 };
 
@@ -228,6 +233,8 @@ struct TBlackboard {
         }
     }
 
+    void InvalidatePartStates(ui32 orderNumber);
+
     void RegisterBlobForPut(const TLogoBlobID& id, std::vector<std::pair<ui64, ui32>> *extraBlockChecks, NWilson::TSpan *span);
 
     TBlobState& operator [](const TLogoBlobID& id);

+ 68 - 40
ydb/core/blobstorage/dsproxy/dsproxy_put.cpp

@@ -70,6 +70,12 @@ class TBlobStorageGroupPutRequest : public TBlobStorageGroupRequestActor<TBlobSt
 
     bool RequireExtraBlockChecks = false;
 
+    struct TIncarnationRecord {
+        ui64 IncarnationGuid;
+        TMonotonic ExpirationTimestamp;
+    };
+    std::vector<std::optional<TIncarnationRecord>> IncarnationRecords;
+
     void SanityCheck() {
         if (RequestsSent <= MaxSaneRequests) {
             return;
@@ -89,29 +95,58 @@ class TBlobStorageGroupPutRequest : public TBlobStorageGroupRequestActor<TBlobSt
         SanityCheck(); // May Die
     }
 
+    template<typename TEvent, typename TCookie>
+    void SendPuts(auto&& callback) {
+        TDeque<std::unique_ptr<TEvent>> v;
+        callback(v);
+        UpdatePengingVDiskResponseCount<TEvent, TCookie>(v);
+        RequestsSent += v.size();
+        CountPuts(v);
+        SendToQueues(v, TimeStatsEnabled);
+    }
+
     void Accelerate() {
         if (IsAccelerated) {
             return;
         }
         IsAccelerated = true;
 
+        auto callback = [this](auto& v) {
+            PutImpl.Accelerate(LogCtx, v);
+            *(IsMultiPutMode ? Mon->NodeMon->AccelerateEvVMultiPutCount : Mon->NodeMon->AccelerateEvVPutCount) += v.size();
+        };
+
         if (IsMultiPutMode) {
-            TDeque<std::unique_ptr<TEvBlobStorage::TEvVMultiPut>> vMultiPuts;
-            PutImpl.Accelerate(LogCtx, vMultiPuts);
-            UpdatePengingVDiskResponseCount<TEvBlobStorage::TEvVMultiPut, TVMultiPutCookie>(vMultiPuts);
-            RequestsSent += vMultiPuts.size();
-            *Mon->NodeMon->AccelerateEvVMultiPutCount += vMultiPuts.size();
-            CountPuts(vMultiPuts);
-            SendToQueues(vMultiPuts, TimeStatsEnabled);
+            SendPuts<TEvBlobStorage::TEvVMultiPut, TVMultiPutCookie>(callback);
         } else {
-            TDeque<std::unique_ptr<TEvBlobStorage::TEvVPut>> vPuts;
-            PutImpl.Accelerate(LogCtx, vPuts);
-            UpdatePengingVDiskResponseCount<TEvBlobStorage::TEvVPut, TBlobCookie>(vPuts);
-            RequestsSent += vPuts.size();
-            *Mon->NodeMon->AccelerateEvVPutCount += vPuts.size();
-            CountPuts(vPuts);
-            SendToQueues(vPuts, TimeStatsEnabled);
+            SendPuts<TEvBlobStorage::TEvVPut, TBlobCookie>(callback);
+        }
+    }
+
+    void HandleIncarnation(TMonotonic timestamp, ui32 orderNumber, ui64 incarnationGuid) {
+        Y_ABORT_UNLESS(orderNumber < IncarnationRecords.size());
+        if (auto& record = IncarnationRecords[orderNumber]; !record) {
+            record = TIncarnationRecord{
+                .IncarnationGuid = incarnationGuid,
+                .ExpirationTimestamp = timestamp,
+            };
+        } else if (record->IncarnationGuid != incarnationGuid) {
+            PutImpl.InvalidatePartStates(orderNumber);
+            record->IncarnationGuid = incarnationGuid;
+            record->ExpirationTimestamp = timestamp;
+        } else if (record->ExpirationTimestamp < timestamp) {
+            record->ExpirationTimestamp = timestamp;
+        }
+    }
+
+    TBlobStorageGroupInfo::TGroupVDisks CreateExpiredVDiskSet(TMonotonic timestamp) const {
+        TBlobStorageGroupInfo::TGroupVDisks res(&Info->GetTopology());
+        for (ui32 i = 0; i < IncarnationRecords.size(); ++i) {
+            if (auto& record = IncarnationRecords[i]; record && record->ExpirationTimestamp <= timestamp) {
+                res |= {&Info->GetTopology(), Info->GetVDiskId(i)};
+            }
         }
+        return res;
     }
 
     void Handle(TEvBlobStorage::TEvVPutResult::TPtr &ev) {
@@ -148,6 +183,10 @@ class TBlobStorageGroupPutRequest : public TBlobStorageGroupRequestActor<TBlobSt
         TVDiskID vDiskId = VDiskIDFromVDiskID(record.GetVDiskID());
         const TVDiskIdShort shortId(vDiskId);
 
+        if (record.HasIncarnationGuid()) {
+            HandleIncarnation(TActivationContext::Monotonic(), Info->GetOrderNumber(shortId), record.GetIncarnationGuid());
+        }
+
         LWPROBE(DSProxyVDiskRequestDuration, TEvBlobStorage::EvVPut, blob.BlobSize(), blob.TabletID(),
                 Info->GroupID, blob.Channel(), Info->GetFailDomainOrderNumber(shortId),
                 GetStartTime(record.GetTimestamps()),
@@ -162,13 +201,10 @@ class TBlobStorageGroupPutRequest : public TBlobStorageGroupRequestActor<TBlobSt
                 GetVDiskTimeMs(record.GetTimestamps()));
         }
 
-        TDeque<std::unique_ptr<TEvBlobStorage::TEvVPut>> vPuts;
         TPutImpl::TPutResultVec putResults;
-        PutImpl.OnVPutEventResult(LogCtx, ev->Sender, *ev->Get(), vPuts, putResults);
-        UpdatePengingVDiskResponseCount<TEvBlobStorage::TEvVPut, TBlobCookie>(vPuts);
-        RequestsSent += vPuts.size();
-        CountPuts(vPuts);
-        SendToQueues(vPuts, TimeStatsEnabled);
+        SendPuts<TEvBlobStorage::TEvVPut, TBlobCookie>([&](auto& v) {
+            PutImpl.OnVPutEventResult(LogCtx, ev->Sender, *ev->Get(), v, putResults);
+        });
         if (ReplyAndDieWithLastResponse(putResults)) {
             return;
         }
@@ -209,6 +245,10 @@ class TBlobStorageGroupPutRequest : public TBlobStorageGroupRequestActor<TBlobSt
         TVDiskID vDiskId = VDiskIDFromVDiskID(record.GetVDiskID());
         const TVDiskIdShort shortId(vDiskId);
 
+        if (record.HasIncarnationGuid()) {
+            HandleIncarnation(TActivationContext::Monotonic(), Info->GetOrderNumber(shortId), record.GetIncarnationGuid());
+        }
+
         Y_ABORT_UNLESS(record.HasCookie());
         TVMultiPutCookie cookie(record.GetCookie());
         const ui64 vdisk = cookie.GetVDiskOrderNumber();
@@ -277,12 +317,9 @@ class TBlobStorageGroupPutRequest : public TBlobStorageGroupRequestActor<TBlobSt
         }
         putResults.clear();
 
-        TDeque<std::unique_ptr<TEvBlobStorage::TEvVMultiPut>> vMultiPuts;
-        PutImpl.OnVPutEventResult(LogCtx, ev->Sender, *ev->Get(), vMultiPuts, putResults);
-        UpdatePengingVDiskResponseCount<TEvBlobStorage::TEvVMultiPut, TVMultiPutCookie>(vMultiPuts);
-        RequestsSent += vMultiPuts.size();
-        CountPuts(vMultiPuts);
-        SendToQueues(vMultiPuts, TimeStatsEnabled);
+        SendPuts<TEvBlobStorage::TEvVMultiPut, TVMultiPutCookie>([&](auto& v) {
+            PutImpl.OnVPutEventResult(LogCtx, ev->Sender, *ev->Get(), v, putResults);
+        });
         if (ReplyAndDieWithLastResponse(putResults)) {
             return;
         }
@@ -443,6 +480,7 @@ public:
         , IsAccelerateScheduled(false)
         , IsMultiPutMode(false)
         , RequireExtraBlockChecks(!ev->ExtraBlockChecks.empty())
+        , IncarnationRecords(info->GetTotalVDisksNum())
     {
         if (ev->Orbit.HasShuttles()) {
             RootCauseTrack.IsOn = true;
@@ -486,6 +524,7 @@ public:
         , IsAccelerated(false)
         , IsAccelerateScheduled(false)
         , IsMultiPutMode(true)
+        , IncarnationRecords(info->GetTotalVDisksNum())
     {
         Y_DEBUG_ABORT_UNLESS(events.size() <= MaxBatchedPutRequests);
         for (auto &ev : events) {
@@ -606,24 +645,13 @@ public:
         }
     }
 
-    template<typename TEvV, typename TCookie>
-    struct TIssue {
-        void operator ()(TThis& self) {
-            TDeque<std::unique_ptr<TEvV>> events;
-            self.PutImpl.GenerateInitialRequests(self.LogCtx, self.PartSets, events);
-            self.UpdatePengingVDiskResponseCount<TEvV, TCookie>(events);
-            self.RequestsSent += events.size();
-            self.CountPuts(events);
-            self.SendToQueues(events, self.TimeStatsEnabled);
-        }
-    };
-
     void ResumeBootstrap() {
         if (EncodeQuantum()) {
+            auto callback = [this](auto& v) { PutImpl.GenerateInitialRequests(LogCtx, PartSets, v); };
             if (IsMultiPutMode) {
-                TIssue<TEvBlobStorage::TEvVMultiPut, TVMultiPutCookie>()(*this);
+                SendPuts<TEvBlobStorage::TEvVMultiPut, TVMultiPutCookie>(callback);
             } else {
-                TIssue<TEvBlobStorage::TEvVPut, TBlobCookie>()(*this);
+                SendPuts<TEvBlobStorage::TEvVPut, TBlobCookie>(callback);
             }
         } else {
             TActivationContext::Send(new IEventHandle(TEvBlobStorage::EvResume, 0, SelfId(), {}, nullptr, 0));

+ 4 - 1
ydb/core/blobstorage/dsproxy/dsproxy_put_impl.h

@@ -427,10 +427,13 @@ public:
     TString DumpFullState() const;
 
     bool MarkBlobAsSent(ui64 blobIdx);
-    bool MarkBlobAsSent(TMap<TLogoBlobID, TBlobState>::iterator it);
 
     TString ToString() const;
 
+    void InvalidatePartStates(ui32 orderNumber) {
+        Blackboard.InvalidatePartStates(orderNumber);
+    }
+
 protected:
     bool RunStrategies(TLogContext &logCtx, TPutResultVec &outPutResults);
     bool RunStrategy(TLogContext &logCtx, const IStrategy& strategy, TPutResultVec &outPutResults);