|
@@ -44,7 +44,7 @@ bool TPDisk::InitCommonLogger() {
|
|
|
}
|
|
|
CommonLogger->SwitchToNewChunk(TReqId(TReqId::InitCommonLoggerSwitchToNewChunk, 0), nullptr);
|
|
|
|
|
|
- // Log chunk can be collected as soon as noone needs it
|
|
|
+ // Log chunk can be collected as soon as no one needs it
|
|
|
ChunkState[chunkIdx].CommitState = TChunkState::DATA_COMMITTED;
|
|
|
}
|
|
|
bool isOk = LogNonceJump(InitialPreviousNonce);
|
|
@@ -200,6 +200,7 @@ bool TPDisk::ProcessChunk0(const NPDisk::TEvReadLogResult &readLogResult, TStrin
|
|
|
<< " Marker# BPD48");
|
|
|
return false;
|
|
|
}
|
|
|
+
|
|
|
TSysLogRecord *sysLogRecord = (TSysLogRecord*)(lastSysLogRecord.data());
|
|
|
|
|
|
if (sysLogRecord->Version < PDISK_SYS_LOG_RECORD_INCOMPATIBLE_VERSION_1000) {
|
|
@@ -481,10 +482,9 @@ TRcBuf TPDisk::ProcessReadSysLogResult(ui64 &outWritePosition, ui64 &outLsn,
|
|
|
}
|
|
|
|
|
|
void TPDisk::ReadAndParseMainLog(const TActorId &pDiskActor) {
|
|
|
- TVector<TLogChunkItem> chunksToRead;
|
|
|
TIntrusivePtr<TLogReaderBase> logReader(new TLogReader(true, this, ActorSystem, pDiskActor, 0, TLogPosition{0, 0},
|
|
|
EOwnerGroupType::Static, TLogPosition{0, 0}, (ui64)-1, SysLogRecord.LogHeadChunkPreviousNonce, 0, 0,
|
|
|
- TReqId(TReqId::ReadAndParseMainLog, 0), std::move(chunksToRead), 0, 0, TVDiskID::InvalidId));
|
|
|
+ TReqId(TReqId::ReadAndParseMainLog, 0), TVector<TLogChunkItem>(), 0, 0, TVDiskID::InvalidId));
|
|
|
TVector<ui64> badOffsets;
|
|
|
// Emits subrequests TCompletionLogReadPart which contains TIntrusivePtr to logReader
|
|
|
logReader->Exec(0, badOffsets, ActorSystem);
|
|
@@ -524,6 +524,7 @@ void TPDisk::ProcessLogReadQueue() {
|
|
|
}
|
|
|
ui32 endLogChunkIdx = CommonLogger->ChunkIdx;
|
|
|
ui64 endLogSectorIdx = CommonLogger->SectorIdx;
|
|
|
+
|
|
|
ownerData.LogReader = new TLogReader(false,
|
|
|
this, ActorSystem, logRead.Sender, logRead.Owner, logStartPosition,
|
|
|
logRead.OwnerGroupType,logRead.Position,
|
|
@@ -1200,7 +1201,7 @@ void TPDisk::OnLogCommitDone(TLogCommitDone &req) {
|
|
|
if (isChunkReleased) {
|
|
|
THolder<TCompletionEventSender> completion(new TCompletionEventSender(this));
|
|
|
if (ReleaseUnusedLogChunks(completion.Get())) {
|
|
|
- WriteSysLogRestorePoint(completion.Release(), req.ReqId, {}); // FIXME: wilson
|
|
|
+ WriteSysLogRestorePoint(completion.Release(), req.ReqId, {});
|
|
|
}
|
|
|
}
|
|
|
TryTrimChunk(false, 0);
|
|
@@ -1247,6 +1248,28 @@ void TPDisk::MarkChunksAsReleased(TReleaseChunks& req) {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+void TPDisk::NotifyLogChunkRead(ui32 chunkIdx, TOwner reader) {
|
|
|
+ TGuard<TMutex> guard(StateMutex);
|
|
|
+ auto iter = LogRecoveryState.Readers.find(chunkIdx);
|
|
|
+
|
|
|
+ if (iter == LogRecoveryState.Readers.end()) {
|
|
|
+ return;
|
|
|
+ }
|
|
|
+
|
|
|
+ auto &readers = iter->second;
|
|
|
+
|
|
|
+ if (readers.any()) {
|
|
|
+ // If there's at least one registered reader.
|
|
|
+ readers.set(reader, false);
|
|
|
+
|
|
|
+ if (readers.none()) {
|
|
|
+ LogRecoveryState.Readers.erase(iter);
|
|
|
+
|
|
|
+ BlockDevice->EraseCacheRange(Format.Offset(chunkIdx, 0), Format.Offset(chunkIdx + 1, 0));
|
|
|
+ }
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
// Schedules EvReadLogResult event for the system log
|
|
|
void TPDisk::InitiateReadSysLog(const TActorId &pDiskActor) {
|
|
|
Y_VERIFY_S(PDiskThread.Running(), "expect PDiskThread to be running");
|
|
@@ -1294,6 +1317,7 @@ void TPDisk::ProcessReadLogResult(const NPDisk::TEvReadLogResult &evReadLogResul
|
|
|
switch (InitPhase) {
|
|
|
case EInitPhase::ReadingSysLog:
|
|
|
{
|
|
|
+ // Finished reading sys log.
|
|
|
TString errorReason;
|
|
|
bool success = ProcessChunk0(evReadLogResult, errorReason);
|
|
|
|
|
@@ -1315,6 +1339,7 @@ void TPDisk::ProcessReadLogResult(const NPDisk::TEvReadLogResult &evReadLogResul
|
|
|
}
|
|
|
case EInitPhase::ReadingLog:
|
|
|
{
|
|
|
+ // Finished reading main log.
|
|
|
InitialLogPosition = evReadLogResult.NextPosition;
|
|
|
if (InitialLogPosition == TLogPosition{0, 0}) {
|
|
|
*Mon.PDiskState = NKikimrBlobStorage::TPDiskState::InitialCommonLogParseError;
|
|
@@ -1403,6 +1428,17 @@ void TPDisk::ProcessReadLogResult(const NPDisk::TEvReadLogResult &evReadLogResul
|
|
|
ActorSystem->Send(pDiskActor, new TEvLogInitResult(false, errorReason));
|
|
|
return;
|
|
|
}
|
|
|
+
|
|
|
+ // For every log chunk build a list of readers. These readers will be reading this chunk.
|
|
|
+ for (auto it = LogChunks.begin(); it != LogChunks.end(); ++it) {
|
|
|
+ auto &readers = LogRecoveryState.Readers[it->ChunkIdx];
|
|
|
+
|
|
|
+ for (ui32 owner = 0; owner < it->OwnerLsnRange.size(); ++owner) {
|
|
|
+ if (it->OwnerLsnRange.size() > owner && it->OwnerLsnRange[owner].IsPresent) {
|
|
|
+ readers.set(owner, true);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
// Increase Nonces to prevent collisions
|