Browse Source

KIKIMR-19060 Only write chunks with state LOG_COMMITTED to cache

senya0x5f 1 year ago
parent
commit
566cf4ffcd

+ 11 - 9
ydb/core/blobstorage/pdisk/blobstorage_pdisk_blockdevice_async.cpp

@@ -1272,16 +1272,18 @@ public:
             ui64 chunkIdx = offset / PDisk->Format.ChunkSize;
             Y_VERIFY(chunkIdx < PDisk->ChunkState.size());
             
-            if ((offset % PDisk->Format.ChunkSize) + completion->GetSize() > PDisk->Format.ChunkSize) {
-                // TODO: split buffer if crossing chunk boundary instead of completely discarding it
-                LOG_INFO_S(
-                    *ActorSystem, NKikimrServices::BS_DEVICE,
-                    "Skip caching log read due to chunk boundary crossing");
-            } else {
-                if (Cache.Size() < MaxCount) {
-                    const char* dataPtr = static_cast<const char*>(completion->GetData());
+            if (TChunkState::LOG_COMMITTED == PDisk->ChunkState[chunkIdx].CommitState) {
+                if ((offset % PDisk->Format.ChunkSize) + completion->GetSize() > PDisk->Format.ChunkSize) {
+                    // TODO: split buffer if crossing chunk boundary instead of completely discarding it
+                    LOG_INFO_S(
+                        *ActorSystem, NKikimrServices::BS_DEVICE,
+                        "Skip caching log read due to chunk boundary crossing");
+                } else {
+                    if (Cache.Size() < MaxCount) {
+                        const char* dataPtr = static_cast<const char*>(completion->GetData());
 
-                    Cache.Insert(dataPtr, completion->GetOffset(), completion->GetSize(), completion->GetBadOffsets());   
+                        Cache.Insert(dataPtr, completion->GetOffset(), completion->GetSize(), completion->GetBadOffsets());   
+                    }
                 }
             }
 

+ 2 - 0
ydb/core/blobstorage/pdisk/blobstorage_pdisk_impl_log.cpp

@@ -1211,6 +1211,8 @@ void TPDisk::MarkChunksAsReleased(TReleaseChunks& req) {
     TGuard<TMutex> guard(StateMutex);
 
     for (const auto& chunkIdx : req.ChunksToRelease) {
+        LogRecoveryState.Readers.erase(chunkIdx);
+
         BlockDevice->EraseCacheRange(
             Format.Offset(chunkIdx, 0), 
             Format.Offset(chunkIdx + 1, 0));

+ 5 - 5
ydb/core/blobstorage/pdisk/blobstorage_pdisk_ut.cpp

@@ -474,8 +474,8 @@ Y_UNIT_TEST_SUITE(TPDiskTest) {
         TVDiskMock vdisk(&testCtx);
         vdisk.InitFull();
 
-        // Fill one log chunk so that on restart it is fully read.
-        for (int i = 0; i < 128; i++) {
+        // Fill log so that one chunk has state LOG_COMMITED on restart.
+        for (int i = 0; i < 256; i++) {
             vdisk.SendEvLogSync(32768);
         }
 
@@ -483,14 +483,14 @@ Y_UNIT_TEST_SUITE(TPDiskTest) {
 
         vdisk.Init();
 
-        // Assert recovery state with readers 1.
+        // Assert recovery state has 2 chunks to be read by vdisk.
         testCtx.SafeRunOnPDisk([](NPDisk::TPDisk* disk) {
-            UNIT_ASSERT_EQUAL(1, disk->LogRecoveryState.Readers.size());
+            UNIT_ASSERT_EQUAL(2, disk->LogRecoveryState.Readers.size());
         });
 
         vdisk.ReadLog();
         
-        // Assert recovery state with readers 0.
+        // Assert recovery state is empty after vdisk read its chunks.
         testCtx.SafeRunOnPDisk([](NPDisk::TPDisk* disk) {
             UNIT_ASSERT_EQUAL(0, disk->LogRecoveryState.Readers.size());
         });