|
@@ -20,6 +20,41 @@ namespace NKikimr {
|
|
|
size_t RewrittenRecsCounter = 0;
|
|
|
size_t RewrittenBytes = 0;
|
|
|
|
|
|
+ struct TCheckLocationMerger {
|
|
|
+ TDefragRecord& Rec;
|
|
|
+ TBlobStorageGroupType GType;
|
|
|
+ bool Found = false;
|
|
|
+
|
|
|
+ TCheckLocationMerger(TDefragRecord& rec, TBlobStorageGroupType gtype)
|
|
|
+ : Rec(rec)
|
|
|
+ , GType(gtype)
|
|
|
+ {}
|
|
|
+
|
|
|
+ void AddFromFresh(const TMemRecLogoBlob& memRec, const TRope*, const TKeyLogoBlob&, ui64) {
|
|
|
+ Process(memRec, nullptr);
|
|
|
+ }
|
|
|
+
|
|
|
+ void AddFromSegment(const TMemRecLogoBlob& memRec, const TDiskPart *outbound, const TKeyLogoBlob&, ui64) {
|
|
|
+ Process(memRec, outbound);
|
|
|
+ }
|
|
|
+
|
|
|
+ static constexpr bool HaveToMergeData() { return false; }
|
|
|
+
|
|
|
+ void Process(const TMemRecLogoBlob& memRec, const TDiskPart *outbound) {
|
|
|
+ TDiskDataExtractor extr;
|
|
|
+ if (memRec.GetType() == TBlobType::HugeBlob || memRec.GetType() == TBlobType::ManyHugeBlobs) {
|
|
|
+ memRec.GetDiskData(&extr, outbound);
|
|
|
+ const NMatrix::TVectorType local = memRec.GetIngress().LocalParts(GType);
|
|
|
+ ui8 partIdx = local.FirstPosition();
|
|
|
+ for (const TDiskPart *p = extr.Begin; p != extr.End; ++p, partIdx = local.NextPosition(partIdx)) {
|
|
|
+ if (*p == Rec.OldDiskPart && partIdx + 1 == Rec.LogoBlobId.PartId()) {
|
|
|
+ Found = true;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ };
|
|
|
+
|
|
|
void Bootstrap(const TActorContext &ctx) {
|
|
|
SendNextRead(ctx);
|
|
|
Become(&TThis::StateFunc);
|
|
@@ -40,37 +75,26 @@ namespace NKikimr {
|
|
|
FullSnap->BarriersSnap.Destroy();
|
|
|
|
|
|
TLogoBlobsSnapshot::TForwardIterator iter(FullSnap->HullCtx, &FullSnap->LogoBlobsSnap);
|
|
|
- const TLogoBlobID id = Recs[RecToReadIdx].LogoBlobId;
|
|
|
+ auto& rec = Recs[RecToReadIdx];
|
|
|
+ const TLogoBlobID id = rec.LogoBlobId;
|
|
|
iter.Seek(id.FullID());
|
|
|
if (iter.Valid() && iter.GetCurKey().LogoBlobID() == id.FullID()) {
|
|
|
- struct TCallback {
|
|
|
- const ui32 PartId;
|
|
|
- TDiskPart Location;
|
|
|
-
|
|
|
- void operator ()(const TDiskPart& data, const NMatrix::TVectorType v) {
|
|
|
- if (v.Get(PartId - 1)) {
|
|
|
- Y_VERIFY(v == NMatrix::TVectorType::MakeOneHot(PartId - 1, v.GetSize()));
|
|
|
- Location = data;
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- void operator ()(const TDiskBlob&) {}
|
|
|
- } callback{id.PartId(), {}};
|
|
|
- TRecordMergerCallback<TKeyLogoBlob, TMemRecLogoBlob, TCallback> merger(&callback, DCtx->VCtx->Top->GType);
|
|
|
+ TCheckLocationMerger merger(rec, DCtx->VCtx->Top->GType);
|
|
|
iter.PutToMerger(&merger);
|
|
|
- merger.Finish();
|
|
|
-
|
|
|
- const TDiskPart &p = callback.Location;
|
|
|
- auto msg = std::make_unique<NPDisk::TEvChunkRead>(DCtx->PDiskCtx->Dsk->Owner,
|
|
|
- DCtx->PDiskCtx->Dsk->OwnerRound, p.ChunkIdx, p.Offset, p.Size, NPriRead::HullComp, nullptr);
|
|
|
- ctx.Send(DCtx->PDiskCtx->PDiskId, msg.release());
|
|
|
- DCtx->DefragMonGroup.DefragBytesRewritten() += p.Size;
|
|
|
- RewrittenBytes += p.Size;
|
|
|
- } else {
|
|
|
- ++RecToReadIdx;
|
|
|
- ++RewrittenRecsCounter;
|
|
|
- SendNextRead(ctx);
|
|
|
+ if (merger.Found) {
|
|
|
+ const TDiskPart& p = rec.OldDiskPart;
|
|
|
+ auto msg = std::make_unique<NPDisk::TEvChunkRead>(DCtx->PDiskCtx->Dsk->Owner,
|
|
|
+ DCtx->PDiskCtx->Dsk->OwnerRound, p.ChunkIdx, p.Offset, p.Size, NPriRead::HullComp, nullptr);
|
|
|
+ ctx.Send(DCtx->PDiskCtx->PDiskId, msg.release());
|
|
|
+ DCtx->DefragMonGroup.DefragBytesRewritten() += p.Size;
|
|
|
+ RewrittenBytes += p.Size;
|
|
|
+ return;
|
|
|
+ }
|
|
|
}
|
|
|
+
|
|
|
+ ++RecToReadIdx;
|
|
|
+ ++RewrittenRecsCounter;
|
|
|
+ SendNextRead(ctx);
|
|
|
}
|
|
|
|
|
|
void Handle(NPDisk::TEvChunkReadResult::TPtr ev, const TActorContext& ctx) {
|