#pragma once #include "log.h" #include "probe.h" #include #include namespace NLWTrace { template class TRunLogShuttleActionExecutor; //////////////////////////////////////////////////////////////////////////////// struct THostTimeCalculator { double K = 0; ui64 B = 0; THostTimeCalculator() { TInstant now = TInstant::Now(); ui64 tsNow = GetCycleCount(); K = 1000000000 / NHPTimer::GetClockRate(); B = now.NanoSeconds() - K * tsNow; } ui64 CyclesToEpochNanoseconds(ui64 cycles) const { return K*cycles + B; } ui64 EpochNanosecondsToCycles(ui64 ns) const { return (ns - B) / K; } }; inline ui64 CyclesToEpochNanoseconds(ui64 cycles) { return Singleton()->CyclesToEpochNanoseconds(cycles); } inline ui64 EpochNanosecondsToCycles(ui64 ns) { return Singleton()->EpochNanosecondsToCycles(ns); } //////////////////////////////////////////////////////////////////////////////// template class TLogShuttle: public IShuttle { private: using TExecutor = TRunLogShuttleActionExecutor; TTrackLog TrackLog; TExecutor* Executor; bool Ignore = false; size_t MaxTrackLength; TAdaptiveLock Lock; TAtomic ForkFailed = 0; public: explicit TLogShuttle(TExecutor* executor) : IShuttle(executor->GetTraceIdx(), executor->NewSpanId()) , Executor(executor) , MaxTrackLength(Executor->GetAction().GetMaxTrackLength() ? Executor->GetAction().GetMaxTrackLength() : 100) { } bool DoAddProbe(TProbe* probe, const TParams& params, ui64 timestamp) override; void DoEndOfTrack() override; void DoDrop() override; void DoSerialize(TShuttleTrace& msg) override; bool DoFork(TShuttlePtr& child) override; bool DoJoin(const TShuttlePtr& child) override; void SetIgnore(bool ignore); void Clear(); const TTrackLog& GetTrackLog() const { return TrackLog; } }; //////////////////////////////////////////////////////////////////////////////// template class TLogShuttleActionBase: public IExecutor { private: const ui64 TraceIdx; public: explicit TLogShuttleActionBase(ui64 traceIdx) : TraceIdx(traceIdx) { } ui64 GetTraceIdx() const { return TraceIdx; } static TLogShuttle* Cast(const TShuttlePtr& shuttle); static TLogShuttle* Cast(IShuttle* shuttle); }; //////////////////////////////////////////////////////////////////////////////// template class TRunLogShuttleActionExecutor: public TLogShuttleActionBase { private: TSpinLock Lock; TVector AllShuttles; TVector Parking; TRunLogShuttleAction Action; TDepot* Depot; TAtomic MissedTracks = 0; TAtomic* LastTrackId; TAtomic* LastSpanId; static constexpr int MaxShuttles = 100000; public: TRunLogShuttleActionExecutor(ui64 traceIdx, const TRunLogShuttleAction& action, TDepot* depot, TAtomic* lastTrackId, TAtomic* lastSpanId); ~TRunLogShuttleActionExecutor(); bool DoExecute(TOrbit& orbit, const TParams& params) override; void RecordShuttle(TLogShuttle* shuttle); void ParkShuttle(TLogShuttle* shuttle); void DiscardShuttle(); TShuttlePtr RentShuttle(); ui64 NewSpanId(); const TRunLogShuttleAction& GetAction() const { return Action; } }; //////////////////////////////////////////////////////////////////////////////// template class TEditLogShuttleActionExecutor: public TLogShuttleActionBase { private: TEditLogShuttleAction Action; public: TEditLogShuttleActionExecutor(ui64 traceIdx, const TEditLogShuttleAction& action); bool DoExecute(TOrbit& orbit, const TParams& params) override; }; //////////////////////////////////////////////////////////////////////////////// template class TDropLogShuttleActionExecutor: public TLogShuttleActionBase { private: TDropLogShuttleAction Action; public: TDropLogShuttleActionExecutor(ui64 traceIdx, const TDropLogShuttleAction& action); bool DoExecute(TOrbit& orbit, const TParams& params) override; }; //////////////////////////////////////////////////////////////////////////////// template bool TLogShuttle::DoAddProbe(TProbe* probe, const TParams& params, ui64 timestamp) { with_lock (Lock) { if (TrackLog.Items.size() >= MaxTrackLength) { TrackLog.Truncated = true; return true; } TrackLog.Items.emplace_back(); TTrackLog::TItem* item = &TrackLog.Items.back(); item->ThreadId = 0; // TODO[serxa]: check if it is fast to run TThread::CurrentThreadId(); item->Probe = probe; if ((item->SavedParamsCount = probe->Event.Signature.ParamCount) > 0) { probe->Event.Signature.CloneParams(item->Params, params); } item->TimestampCycles = timestamp ? timestamp : GetCycleCount(); } return true; } template void TLogShuttle::DoEndOfTrack() { // Record track log if not ignored if (!Ignore) { if (AtomicGet(ForkFailed)) { Executor->DiscardShuttle(); } else { Executor->RecordShuttle(this); } } Executor->ParkShuttle(this); } template void TLogShuttle::DoDrop() { // Do not track log results of dropped shuttles Executor->ParkShuttle(this); } template void TLogShuttle::SetIgnore(bool ignore) { Ignore = ignore; } template void TLogShuttle::Clear() { TrackLog.Clear(); AtomicSet(ForkFailed, 0); } template void TLogShuttle::DoSerialize(TShuttleTrace& msg) { with_lock (Lock) { if (!GetTrackLog().Items.size()) { return ; } for (auto& record : GetTrackLog().Items) { auto *rec = msg.AddEvents(); rec->SetName(record.Probe->Event.Name); rec->SetProvider(record.Probe->Event.GetProvider()); rec->SetTimestampNanosec( CyclesToEpochNanoseconds(record.TimestampCycles)); record.Probe->Event.Signature.SerializeToPb(record.Params, *rec->MutableParams()); } } } template TLogShuttle* TLogShuttleActionBase::Cast(const TShuttlePtr& shuttle) { return static_cast*>(shuttle.Get()); } template TLogShuttle* TLogShuttleActionBase::Cast(IShuttle* shuttle) { return static_cast*>(shuttle); } //////////////////////////////////////////////////////////////////////////////// template TRunLogShuttleActionExecutor::TRunLogShuttleActionExecutor( ui64 traceIdx, const TRunLogShuttleAction& action, TDepot* depot, TAtomic* lastTrackId, TAtomic* lastSpanId) : TLogShuttleActionBase(traceIdx) , Action(action) , Depot(depot) , LastTrackId(lastTrackId) , LastSpanId(lastSpanId) { ui64 size = Min(Action.GetShuttlesCount() ? Action.GetShuttlesCount() : 1000, MaxShuttles); // Do not allow to allocate too much memory AllShuttles.reserve(size); Parking.reserve(size); for (ui64 i = 0; i < size; i++) { TShuttlePtr shuttle(new TLogShuttle(this)); AllShuttles.emplace_back(shuttle); Parking.emplace_back(shuttle); } } template TRunLogShuttleActionExecutor::~TRunLogShuttleActionExecutor() { for (TShuttlePtr& shuttle : AllShuttles) { shuttle->Kill(); } } template bool TRunLogShuttleActionExecutor::DoExecute(TOrbit& orbit, const TParams& params) { Y_UNUSED(params); if (TShuttlePtr shuttle = RentShuttle()) { this->Cast(shuttle)->SetIgnore(Action.GetIgnore()); orbit.AddShuttle(shuttle); } else { AtomicIncrement(MissedTracks); } return true; } template void TRunLogShuttleActionExecutor::DiscardShuttle() { AtomicIncrement(MissedTracks); } template void TRunLogShuttleActionExecutor::RecordShuttle(TLogShuttle* shuttle) { if (Depot == nullptr) { return; } typename TDepot::TAccessor a(*Depot); if (TTrackLog* trackLog = a.Add()) { *trackLog = shuttle->GetTrackLog(); trackLog->Id = AtomicIncrement(*LastTrackId); // Track id is assigned at reporting time } } template TShuttlePtr TRunLogShuttleActionExecutor::RentShuttle() { TGuard g(Lock); if (Parking.empty()) { return TShuttlePtr(); } else { TShuttlePtr shuttle = Parking.back(); Parking.pop_back(); return shuttle; } } template void TRunLogShuttleActionExecutor::ParkShuttle(TLogShuttle* shuttle) { shuttle->Clear(); TGuard g(Lock); Parking.emplace_back(shuttle); } template ui64 TRunLogShuttleActionExecutor::NewSpanId() { return LastSpanId ? AtomicIncrement(*LastSpanId) : 0; } //////////////////////////////////////////////////////////////////////////////// template TEditLogShuttleActionExecutor::TEditLogShuttleActionExecutor(ui64 traceIdx, const TEditLogShuttleAction& action) : TLogShuttleActionBase(traceIdx) , Action(action) { } template bool TEditLogShuttleActionExecutor::DoExecute(TOrbit& orbit, const TParams& params) { Y_UNUSED(params); bool ignore = Action.GetIgnore(); orbit.ForEachShuttle(this->GetTraceIdx(), [=](IShuttle* shuttle) { this->Cast(shuttle)->SetIgnore(ignore); return true; }); return true; } //////////////////////////////////////////////////////////////////////////////// template TDropLogShuttleActionExecutor::TDropLogShuttleActionExecutor(ui64 traceIdx, const TDropLogShuttleAction& action) : TLogShuttleActionBase(traceIdx) , Action(action) { } template bool TDropLogShuttleActionExecutor::DoExecute(TOrbit& orbit, const TParams& params) { Y_UNUSED(params); orbit.ForEachShuttle(this->GetTraceIdx(), [](IShuttle*) { return false; // Erase shuttle from orbit }); return true; } }