|
@@ -527,7 +527,7 @@ bool TPDisk::ReleaseUnusedLogChunks(TCompletionEventSender *completion) {
|
|
|
// Case 1: Chunks to be deleted located at the start of LogChunks list
|
|
|
} else if (!gapStart && gapEnd) {
|
|
|
IsLogChunksReleaseInflight = true;
|
|
|
- completion->Req = THolder<TRequestBase>(ReqCreator.CreateFromArgs<TReleaseChunks>(std::move(chunksToRelease)));
|
|
|
+ completion->Req = THolder<TRequestBase>(ReqCreator.CreateFromArgs<TReleaseChunks>(std::move(chunksToRelease), NWilson::TSpan{}));
|
|
|
SysLogRecord.LogHeadChunkIdx = gapEnd->ChunkIdx;
|
|
|
SysLogRecord.LogHeadChunkPreviousNonce = ChunkState[gapEnd->ChunkIdx].PreviousNonce;
|
|
|
PrintLogChunksInfo("cut tail log");
|
|
@@ -537,7 +537,7 @@ bool TPDisk::ReleaseUnusedLogChunks(TCompletionEventSender *completion) {
|
|
|
Y_ABORT_UNLESS(KIKIMR_PDISK_ENABLE_CUT_LOG_FROM_THE_MIDDLE);
|
|
|
IsLogChunksReleaseInflight = true;
|
|
|
Mon.SplicedLogChunks->Add(chunksToRelease.size());
|
|
|
- completion->Req = THolder<TRequestBase>(ReqCreator.CreateFromArgs<TReleaseChunks>(*gapStart, *gapEnd, std::move(chunksToRelease)));
|
|
|
+ completion->Req = THolder<TRequestBase>(ReqCreator.CreateFromArgs<TReleaseChunks>(*gapStart, *gapEnd, std::move(chunksToRelease), NWilson::TSpan{}));
|
|
|
PrintLogChunksInfo("log splice");
|
|
|
return true;
|
|
|
} else {
|
|
@@ -892,6 +892,7 @@ bool TPDisk::ChunkWritePiece(TChunkWrite *evChunkWrite, ui32 pieceShift, ui32 pi
|
|
|
const ui32 count = evChunkWrite->PartsPtr->Size();
|
|
|
for (ui32 partIdx = evChunkWrite->CurrentPart; partIdx < count; ++partIdx) {
|
|
|
ui32 remainingPartSize = (*evChunkWrite->PartsPtr)[partIdx].second - evChunkWrite->CurrentPartOffset;
|
|
|
+ auto traceId = evChunkWrite->Span.GetTraceId();
|
|
|
if (bytesAvailable < remainingPartSize) {
|
|
|
ui32 sizeToWrite = bytesAvailable;
|
|
|
if (sizeToWrite > 0) {
|
|
@@ -899,10 +900,10 @@ bool TPDisk::ChunkWritePiece(TChunkWrite *evChunkWrite, ui32 pieceShift, ui32 pi
|
|
|
if (data) {
|
|
|
ui8 *source = data + evChunkWrite->CurrentPartOffset;
|
|
|
NSan::CheckMemIsInitialized(source, sizeToWrite);
|
|
|
- writer.WriteData(source, sizeToWrite, evChunkWrite->ReqId, &evChunkWrite->TraceId);
|
|
|
+ writer.WriteData(source, sizeToWrite, evChunkWrite->ReqId, &traceId);
|
|
|
*Mon.BandwidthPChunkPayload += sizeToWrite;
|
|
|
} else {
|
|
|
- writer.WritePadding(sizeToWrite, evChunkWrite->ReqId, &evChunkWrite->TraceId);
|
|
|
+ writer.WritePadding(sizeToWrite, evChunkWrite->ReqId, &traceId);
|
|
|
*Mon.BandwidthPChunkPadding += sizeToWrite;
|
|
|
}
|
|
|
evChunkWrite->RemainingSize -= sizeToWrite;
|
|
@@ -918,10 +919,10 @@ bool TPDisk::ChunkWritePiece(TChunkWrite *evChunkWrite, ui32 pieceShift, ui32 pi
|
|
|
ui8 *data = (ui8*)(*evChunkWrite->PartsPtr)[partIdx].first;
|
|
|
if (data) {
|
|
|
ui8 *source = data + evChunkWrite->CurrentPartOffset;
|
|
|
- writer.WriteData(source, sizeToWrite, evChunkWrite->ReqId, &evChunkWrite->TraceId);
|
|
|
+ writer.WriteData(source, sizeToWrite, evChunkWrite->ReqId, &traceId);
|
|
|
*Mon.BandwidthPChunkPayload += sizeToWrite;
|
|
|
} else {
|
|
|
- writer.WritePadding(sizeToWrite, evChunkWrite->ReqId, &evChunkWrite->TraceId);
|
|
|
+ writer.WritePadding(sizeToWrite, evChunkWrite->ReqId, &traceId);
|
|
|
*Mon.BandwidthPChunkPadding += sizeToWrite;
|
|
|
}
|
|
|
evChunkWrite->CurrentPartOffset = 0;
|
|
@@ -938,13 +939,15 @@ bool TPDisk::ChunkWritePiece(TChunkWrite *evChunkWrite, ui32 pieceShift, ui32 pi
|
|
|
<< " Marker# BPD79");
|
|
|
|
|
|
if (!writer.IsEmptySector()) {
|
|
|
+ auto traceId = evChunkWrite->Span.GetTraceId();
|
|
|
*Mon.BandwidthPChunkPadding += writer.SectorBytesFree;
|
|
|
- writer.WriteZeroes(writer.SectorBytesFree, evChunkWrite->ReqId, &evChunkWrite->TraceId);
|
|
|
+ writer.WriteZeroes(writer.SectorBytesFree, evChunkWrite->ReqId, &traceId);
|
|
|
LOG_INFO(*ActorSystem, NKikimrServices::BS_PDISK, "PDiskId# %" PRIu32 " chunkIdx# %" PRIu32
|
|
|
" was zero-padded after writing", (ui32)PDiskId, (ui32)chunkIdx);
|
|
|
}
|
|
|
+ auto traceId = evChunkWrite->Span.GetTraceId();
|
|
|
evChunkWrite->Completion->Orbit = std::move(evChunkWrite->Orbit);
|
|
|
- writer.Flush(evChunkWrite->ReqId, &evChunkWrite->TraceId, evChunkWrite->Completion.Release());
|
|
|
+ writer.Flush(evChunkWrite->ReqId, &traceId, evChunkWrite->Completion.Release());
|
|
|
|
|
|
|
|
|
evChunkWrite->IsReplied = true;
|
|
@@ -1053,8 +1056,9 @@ TPDisk::EChunkReadPieceResult TPDisk::ChunkReadPiece(TIntrusivePtr<TChunkRead> &
|
|
|
completion->CostNs = DriveModel.TimeForSizeNs(bytesToRead, read->ChunkIdx, TDriveModel::OP_TYPE_READ);
|
|
|
Y_ABORT_UNLESS(bytesToRead <= completion->GetBuffer()->Size());
|
|
|
ui8 *data = completion->GetBuffer()->Data();
|
|
|
+ auto traceId = read->Span.GetTraceId();
|
|
|
BlockDevice->PreadAsync(data, bytesToRead, readOffset, completion.Release(),
|
|
|
- read->ReqId, &read->TraceId);
|
|
|
+ read->ReqId, &traceId);
|
|
|
// TODO: align the data on SectorSize, not PAGE_SIZE
|
|
|
// TODO: use the BLKSSZGET ioctl to obtain a backing store's sector size
|
|
|
return isTheLastPart ? ReadPieceResultOk : ReadPieceResultInProgress;
|
|
@@ -1346,7 +1350,8 @@ TVector<TChunkIdx> TPDisk::AllocateChunkForOwner(const TRequestBase *req, const
|
|
|
for (TChunkIdx chunkIdx : chunks) {
|
|
|
ui64 chunkNonce = SysLogRecord.Nonces.Value[NonceData];
|
|
|
SysLogRecord.Nonces.Value[NonceData] += dataChunkSizeSectors;
|
|
|
- OnNonceChange(NonceData, req->ReqId, &req->TraceId);
|
|
|
+ auto traceId = req->Span.GetTraceId();
|
|
|
+ OnNonceChange(NonceData, req->ReqId, &traceId);
|
|
|
// Remember who owns the sector, save chunk Nonce in order to be able to continue writing the chunk
|
|
|
TChunkState &state = ChunkState[chunkIdx];
|
|
|
Y_VERIFY_S(state.OwnerId == OwnerUnallocated
|
|
@@ -2140,7 +2145,7 @@ void TPDisk::KillOwner(TOwner owner, TOwnerRound killOwnerRound, TCompletionEven
|
|
|
<< " removed ownerId# " << owner << " from chunks Keeper");
|
|
|
}
|
|
|
|
|
|
- TryTrimChunk(false, 0);
|
|
|
+ TryTrimChunk(false, 0, NWilson::TSpan{});
|
|
|
ui64 lastSeenLsn = 0;
|
|
|
auto it = LogChunks.begin();
|
|
|
while (it != LogChunks.end()) {
|
|
@@ -2401,7 +2406,7 @@ void TPDisk::ClearQuarantineChunks() {
|
|
|
}
|
|
|
|
|
|
// Should be called to initiate TRIM (on chunk delete or prev trim done)
|
|
|
-void TPDisk::TryTrimChunk(bool prevDone, ui64 trimmedSize) {
|
|
|
+void TPDisk::TryTrimChunk(bool prevDone, ui64 trimmedSize, const NWilson::TSpan& parentSpan) {
|
|
|
TGuard<TMutex> g(StateMutex);
|
|
|
TrimOffset += trimmedSize;
|
|
|
if (!DriveModel.IsTrimSupported()) {
|
|
@@ -2440,7 +2445,7 @@ void TPDisk::TryTrimChunk(bool prevDone, ui64 trimmedSize) {
|
|
|
if (ChunkBeingTrimmed) { // Initiate trim of next part of chunk
|
|
|
const ui64 trimStep = (Keeper.GetTrimmedFreeChunkCount() > 100 ? 2 << 20 : 32 << 20);
|
|
|
ui64 trimSize = Min<ui64>(Format.ChunkSize - TrimOffset, trimStep);
|
|
|
- TChunkTrim* trim = ReqCreator.CreateChunkTrim(ChunkBeingTrimmed, TrimOffset, trimSize);
|
|
|
+ TChunkTrim* trim = ReqCreator.CreateChunkTrim(ChunkBeingTrimmed, TrimOffset, trimSize, parentSpan);
|
|
|
InputRequest(trim);
|
|
|
TrimInFly = true;
|
|
|
}
|
|
@@ -2496,7 +2501,7 @@ void TPDisk::ProcessFastOperationsQueue() {
|
|
|
OnLogCommitDone(static_cast<TLogCommitDone&>(*req));
|
|
|
break;
|
|
|
case ERequestType::RequestTryTrimChunk:
|
|
|
- TryTrimChunk(true, static_cast<TTryTrimChunk&>(*req).TrimSize);
|
|
|
+ TryTrimChunk(true, static_cast<TTryTrimChunk&>(*req).TrimSize, req->Span);
|
|
|
break;
|
|
|
case ERequestType::RequestReleaseChunks:
|
|
|
MarkChunksAsReleased(static_cast<TReleaseChunks&>(*req));
|
|
@@ -2746,6 +2751,7 @@ void TPDisk::PrepareLogError(TLogWrite *logWrite, TStringStream& err, NKikimrPro
|
|
|
<< " lsn# " << logWrite->Lsn;
|
|
|
LOG_ERROR_S(*ActorSystem, NKikimrServices::BS_PDISK, err.Str());
|
|
|
|
|
|
+ logWrite->Span.EndError(err.Str());
|
|
|
logWrite->Result.Reset(new NPDisk::TEvLogResult(status,
|
|
|
GetStatusFlags(logWrite->Owner, logWrite->OwnerGroupType), err.Str()));
|
|
|
logWrite->Result->Results.push_back(NPDisk::TEvLogResult::TRecord(logWrite->Lsn, logWrite->Cookie));
|
|
@@ -3115,6 +3121,7 @@ bool TPDisk::PreprocessRequest(TRequestBase *request) {
|
|
|
}
|
|
|
|
|
|
void TPDisk::PushRequestToForseti(TRequestBase *request) {
|
|
|
+ request->Span.Event("PushToForseti", {});
|
|
|
if (request->GateId != GateFastOperation) {
|
|
|
bool isAdded = false;
|
|
|
|
|
@@ -3153,6 +3160,9 @@ void TPDisk::PushRequestToForseti(TRequestBase *request) {
|
|
|
&& static_cast<TRequestBase *>(job->Payload)->GetType() == ERequestType::RequestLogWrite) {
|
|
|
TLogWrite &batch = *static_cast<TLogWrite*>(job->Payload);
|
|
|
|
|
|
+ request->Span.Event("AddToBatch", NWilson::TKeyValueList{{
|
|
|
+ { "Batch.ReqId", static_cast<i64>(batch.ReqId.Id) },
|
|
|
+ }});
|
|
|
batch.AddToBatch(static_cast<TLogWrite*>(request));
|
|
|
ui64 prevCost = job->Cost;
|
|
|
job->Cost += request->Cost;
|
|
@@ -3176,12 +3186,17 @@ void TPDisk::PushRequestToForseti(TRequestBase *request) {
|
|
|
SplitChunkJobSize(whole->TotalSize, &smallJobSize, &largeJobSize, &smallJobCount);
|
|
|
for (ui32 idx = 0; idx < smallJobCount; ++idx) {
|
|
|
// Schedule small job.
|
|
|
- TChunkWritePiece *piece = new TChunkWritePiece(whole, idx * smallJobSize, smallJobSize);
|
|
|
+ auto span = request->Span.CreateChild(TWilson::PDisk, "PDisk.ChunkWritePiece", NWilson::EFlags::AUTO_END);
|
|
|
+ span.Attribute("small_job_idx", idx)
|
|
|
+ .Attribute("is_last_piece", false);
|
|
|
+ TChunkWritePiece *piece = new TChunkWritePiece(whole, idx * smallJobSize, smallJobSize, std::move(span));
|
|
|
piece->EstimateCost(DriveModel);
|
|
|
AddJobToForseti(cbs, piece, request->JobKind);
|
|
|
}
|
|
|
// Schedule large job (there always is one)
|
|
|
- TChunkWritePiece *piece = new TChunkWritePiece(whole, smallJobCount * smallJobSize, largeJobSize);
|
|
|
+ auto span = request->Span.CreateChild(TWilson::PDisk, "PDisk.ChunkWritePiece", NWilson::EFlags::AUTO_END);
|
|
|
+ span.Attribute("is_last_piece", true);
|
|
|
+ TChunkWritePiece *piece = new TChunkWritePiece(whole, smallJobCount * smallJobSize, largeJobSize, std::move(span));
|
|
|
piece->EstimateCost(DriveModel);
|
|
|
AddJobToForseti(cbs, piece, request->JobKind);
|
|
|
LWTRACK(PDiskAddWritePieceToScheduler, request->Orbit, PDiskId, request->ReqId.Id,
|
|
@@ -3199,9 +3214,12 @@ void TPDisk::PushRequestToForseti(TRequestBase *request) {
|
|
|
ui32 largeJobSize = totalSectors - smallJobSize * smallJobCount;
|
|
|
|
|
|
for (ui32 idx = 0; idx < smallJobCount; ++idx) {
|
|
|
+ auto span = request->Span.CreateChild(TWilson::PDisk, "PDisk.ChunkReadPiece", NWilson::EFlags::AUTO_END);
|
|
|
+ span.Attribute("small_job_idx", idx)
|
|
|
+ .Attribute("is_last_piece", false);
|
|
|
// Schedule small job.
|
|
|
auto piece = new TChunkReadPiece(read, idx * smallJobSize,
|
|
|
- smallJobSize * Format.SectorSize, false);
|
|
|
+ smallJobSize * Format.SectorSize, false, std::move(span));
|
|
|
LWTRACK(PDiskChunkReadPieceAddToScheduler, read->Orbit, PDiskId, idx, idx * smallJobSize,
|
|
|
smallJobSize * Format.SectorSize);
|
|
|
piece->EstimateCost(DriveModel);
|
|
@@ -3209,18 +3227,23 @@ void TPDisk::PushRequestToForseti(TRequestBase *request) {
|
|
|
AddJobToForseti(cbs, piece, request->JobKind);
|
|
|
}
|
|
|
// Schedule large job (there always is one)
|
|
|
+ NWilson::TSpan span(TWilson::PDisk, request->Span.GetTraceId(),
|
|
|
+ "PDisk.ChunkReadPiece", NWilson::EFlags::NONE, request->Span.GetActorSystem());
|
|
|
+ span.Attribute("is_last_piece", true);
|
|
|
auto piece = new TChunkReadPiece(read, smallJobCount * smallJobSize,
|
|
|
- largeJobSize * Format.SectorSize, true);
|
|
|
+ largeJobSize * Format.SectorSize, true, std::move(span));
|
|
|
LWTRACK(PDiskChunkReadPieceAddToScheduler, read->Orbit, PDiskId, smallJobCount,
|
|
|
smallJobCount * smallJobSize, largeJobSize * Format.SectorSize);
|
|
|
piece->EstimateCost(DriveModel);
|
|
|
piece->SelfPointer = piece;
|
|
|
AddJobToForseti(cbs, piece, request->JobKind);
|
|
|
+
|
|
|
} else {
|
|
|
AddJobToForseti(cbs, request, request->JobKind);
|
|
|
}
|
|
|
}
|
|
|
} else {
|
|
|
+ request->Span.Event("PushToFastOperationsQueue", {});
|
|
|
FastOperationsQueue.push_back(std::unique_ptr<TRequestBase>(request));
|
|
|
LOG_DEBUG(*ActorSystem, NKikimrServices::BS_PDISK, "PDiskId# %" PRIu32 " ReqId# %" PRIu64
|
|
|
" PushRequestToForseti Push to FastOperationsQueue.size# %" PRIu64,
|
|
@@ -3268,21 +3291,25 @@ void TPDisk::RouteRequest(TRequestBase *request) {
|
|
|
case ERequestType::RequestLogReadResultProcess:
|
|
|
[[fallthrough]];
|
|
|
case ERequestType::RequestLogSectorRestore:
|
|
|
+ request->Span.Event("PushToJointLogReads", {});
|
|
|
JointLogReads.push_back(request);
|
|
|
break;
|
|
|
case ERequestType::RequestChunkReadPiece:
|
|
|
{
|
|
|
TChunkReadPiece *piece = static_cast<TChunkReadPiece*>(request);
|
|
|
+ request->Span.Event("PushToJointChunkReads", {});
|
|
|
JointChunkReads.emplace_back(piece->SelfPointer.Get());
|
|
|
piece->SelfPointer.Reset();
|
|
|
// FIXME(cthulhu): Unreserve() for TChunkReadPiece is called while processing to avoid requeueing issues
|
|
|
break;
|
|
|
}
|
|
|
case ERequestType::RequestChunkWritePiece:
|
|
|
+ request->Span.Event("PushToJointChunkWrites", {});
|
|
|
JointChunkWrites.push_back(request);
|
|
|
break;
|
|
|
case ERequestType::RequestChunkTrim:
|
|
|
{
|
|
|
+ request->Span.Event("PushToJointChunkTrims", {});
|
|
|
TChunkTrim *trim = static_cast<TChunkTrim*>(request);
|
|
|
JointChunkTrims.push_back(trim);
|
|
|
break;
|
|
@@ -3293,8 +3320,10 @@ void TPDisk::RouteRequest(TRequestBase *request) {
|
|
|
while (log) {
|
|
|
TLogWrite *batch = log->PopFromBatch();
|
|
|
|
|
|
+ log->Span.Event("PushToJointLogWrites", {});
|
|
|
JointLogWrites.push_back(log);
|
|
|
if (log->Signature.HasCommitRecord()) {
|
|
|
+ log->Span.Event("PushToJointCommits", {});
|
|
|
JointCommits.push_back(log);
|
|
|
}
|
|
|
log = batch;
|
|
@@ -3303,6 +3332,7 @@ void TPDisk::RouteRequest(TRequestBase *request) {
|
|
|
}
|
|
|
case ERequestType::RequestChunkForget:
|
|
|
{
|
|
|
+ request->Span.Event("PushToJointChunkForgets", {});
|
|
|
TChunkForget *forget = static_cast<TChunkForget*>(request);
|
|
|
JointChunkForgets.push_back(std::unique_ptr<TChunkForget>(forget));
|
|
|
break;
|
|
@@ -3393,9 +3423,11 @@ void TPDisk::EnqueueAll() {
|
|
|
TYardControl &evControl = *static_cast<TYardControl*>(request);
|
|
|
switch (evControl.Action) {
|
|
|
case NPDisk::TEvYardControl::ActionPause:
|
|
|
+ request->Span.Event("PushToPausedQueue", {});
|
|
|
PausedQueue.push_back(request);
|
|
|
break;
|
|
|
case NPDisk::TEvYardControl::ActionStep:
|
|
|
+ request->Span.Event("PushToPausedQueue", {});
|
|
|
PausedQueue.push_back(request);
|
|
|
ActorSystem->Send(evControl.Sender, new NPDisk::TEvYardControlResult(
|
|
|
NKikimrProto::OK, evControl.Cookie, TString()));
|
|
@@ -3416,6 +3448,7 @@ void TPDisk::EnqueueAll() {
|
|
|
break;
|
|
|
}
|
|
|
} else {
|
|
|
+ request->Span.Event("PushToPausedQueue", {});
|
|
|
PausedQueue.push_back(request);
|
|
|
}
|
|
|
} else {
|