|
@@ -103,14 +103,14 @@ namespace NKikimr {
|
|
|
NBackpressure::TQueueClientId ClientId;
|
|
|
TActorId ActorId;
|
|
|
NWilson::TSpan Span;
|
|
|
- std::unique_ptr<TVDiskSkeletonTrace> Trace;
|
|
|
+ std::shared_ptr<TVDiskSkeletonTrace> Trace;
|
|
|
ui64 Cookie;
|
|
|
|
|
|
TRecord() = default;
|
|
|
|
|
|
TRecord(std::unique_ptr<IEventHandle> ev, TInstant now, ui32 recByteSize, const NBackpressure::TMessageId &msgId,
|
|
|
ui64 cost, TInstant deadline, NKikimrBlobStorage::EVDiskQueueId extQueueId,
|
|
|
- const NBackpressure::TQueueClientId& clientId, TString name, std::unique_ptr<TVDiskSkeletonTrace> &&trace,
|
|
|
+ const NBackpressure::TQueueClientId& clientId, TString name, std::shared_ptr<TVDiskSkeletonTrace> &&trace,
|
|
|
ui64 cookie)
|
|
|
: Ev(std::move(ev))
|
|
|
, ReceivedTime(now)
|
|
@@ -144,14 +144,14 @@ namespace NKikimr {
|
|
|
////////////////////////////////////////////////////////////////////////////
|
|
|
class TIntQueueClass {
|
|
|
using TQueueType = TQueueInplace<TRecord, 4096>;
|
|
|
- using TFreeTraceObjectsQueue = TQueueInplace<std::unique_ptr<TVDiskSkeletonTrace>, 4096>;
|
|
|
+ using TFreeTraceObjectsQueue = TQueueInplace<std::shared_ptr<TVDiskSkeletonTrace>, 4096>;
|
|
|
|
|
|
struct TMsgInfo {
|
|
|
ui64 MsgId;
|
|
|
TInstant ReceivedTime;
|
|
|
- std::unique_ptr<TVDiskSkeletonTrace> VDiskSkeletonTrace;
|
|
|
+ std::shared_ptr<TVDiskSkeletonTrace> VDiskSkeletonTrace;
|
|
|
|
|
|
- TMsgInfo(ui64 msgId, TInstant receivedTime, std::unique_ptr<TVDiskSkeletonTrace> &&trace)
|
|
|
+ TMsgInfo(ui64 msgId, TInstant receivedTime, std::shared_ptr<TVDiskSkeletonTrace> &&trace)
|
|
|
: MsgId(msgId)
|
|
|
, ReceivedTime(receivedTime)
|
|
|
, VDiskSkeletonTrace(std::move(trace))
|
|
@@ -220,13 +220,14 @@ namespace NKikimr {
|
|
|
return Queue->GetSize();
|
|
|
}
|
|
|
|
|
|
- std::unique_ptr<TVDiskSkeletonTrace> GetFreeTrace() {
|
|
|
- if (std::unique_ptr<TVDiskSkeletonTrace> *ptr = FreeTraceObjects->Head()) {
|
|
|
- std::unique_ptr<TVDiskSkeletonTrace> tmp = std::move(*ptr);
|
|
|
+ std::shared_ptr<TVDiskSkeletonTrace> GetFreeTrace() {
|
|
|
+ if (std::shared_ptr<TVDiskSkeletonTrace> *ptr = FreeTraceObjects->Head()) {
|
|
|
+ std::shared_ptr<TVDiskSkeletonTrace> tmp = std::move(*ptr);
|
|
|
+ Y_DEBUG_ABORT_UNLESS(tmp.use_count() == 1);
|
|
|
FreeTraceObjects->Pop();
|
|
|
return tmp;
|
|
|
} else {
|
|
|
- return std::make_unique<TVDiskSkeletonTrace>();
|
|
|
+ return std::make_shared<TVDiskSkeletonTrace>();
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -234,7 +235,7 @@ namespace NKikimr {
|
|
|
void Enqueue(const TActorContext &ctx, ui32 recByteSize, std::unique_ptr<IEventHandle> converted,
|
|
|
const NBackpressure::TMessageId &msgId, ui64 cost, const TInstant &deadline,
|
|
|
NKikimrBlobStorage::EVDiskQueueId extQueueId, TFront& front,
|
|
|
- const NBackpressure::TQueueClientId& clientId, std::unique_ptr<TVDiskSkeletonTrace> &&trace) {
|
|
|
+ const NBackpressure::TQueueClientId& clientId, std::shared_ptr<TVDiskSkeletonTrace> &&trace) {
|
|
|
Y_UNUSED(front);
|
|
|
ui64 cookie = converted->Cookie;
|
|
|
if (!Queue->Head() && CanSendToSkeleton(cost)) {
|
|
@@ -338,10 +339,9 @@ namespace NKikimr {
|
|
|
auto it = Msgs.find(cookie);
|
|
|
if (it != Msgs.end()) {
|
|
|
Y_VERIFY_S(it!= Msgs.end(), "cookie# " << cookie);
|
|
|
- if (it->second.VDiskSkeletonTrace) {
|
|
|
- it->second.VDiskSkeletonTrace->AdditionalTrace = nullptr;
|
|
|
- it->second.VDiskSkeletonTrace->MarkCount = 0;
|
|
|
- FreeTraceObjects->Push(std::move(it->second.VDiskSkeletonTrace));
|
|
|
+ if (auto trace = std::exchange(it->second.VDiskSkeletonTrace, nullptr); trace && trace.use_count() == 1) {
|
|
|
+ trace->Clear();
|
|
|
+ FreeTraceObjects->Push(std::move(trace));
|
|
|
}
|
|
|
Msgs.erase(it);
|
|
|
}
|
|
@@ -1267,10 +1267,10 @@ namespace NKikimr {
|
|
|
std::unique_ptr<IEventHandle> event = extQueue.Enqueue(ctx, std::unique_ptr<IEventHandle>(
|
|
|
ev->Forward(SkeletonId).Release()), msgId, cost, *this, clientId);
|
|
|
if (event) {
|
|
|
- std::unique_ptr<TVDiskSkeletonTrace> trace;
|
|
|
+ std::shared_ptr<TVDiskSkeletonTrace> trace;
|
|
|
if constexpr (IsPatchEvent<std::decay_t<decltype(*ev->Get())>>) {
|
|
|
trace = intQueue.GetFreeTrace();
|
|
|
- event->Get<std::decay_t<decltype(*ev->Get())>>()->VDiskSkeletonTrace = trace.get();
|
|
|
+ event->Get<std::decay_t<decltype(*ev->Get())>>()->VDiskSkeletonTrace = trace;
|
|
|
}
|
|
|
// good, enqueue it in intQueue
|
|
|
intQueue.Enqueue(ctx, recByteSize, std::move(event), msgId, cost,
|