123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360 |
- #pragma once
- #include "log.h"
- #include "probe.h"
- #include <library/cpp/lwtrace/protos/lwtrace.pb.h>
- #include <util/system/spinlock.h>
- namespace NLWTrace {
- template <class TDepot>
- class TRunLogShuttleActionExecutor;
- ////////////////////////////////////////////////////////////////////////////////
- struct THostTimeCalculator {
- double K = 0;
- ui64 B = 0;
- THostTimeCalculator() {
- TInstant now = TInstant::Now();
- ui64 tsNow = GetCycleCount();
- K = 1000000000 / NHPTimer::GetClockRate();
- B = now.NanoSeconds() - K * tsNow;
- }
- ui64 CyclesToEpochNanoseconds(ui64 cycles) const {
- return K*cycles + B;
- }
- ui64 EpochNanosecondsToCycles(ui64 ns) const {
- return (ns - B) / K;
- }
- };
- inline ui64 CyclesToEpochNanoseconds(ui64 cycles) {
- return Singleton<THostTimeCalculator>()->CyclesToEpochNanoseconds(cycles);
- }
- inline ui64 EpochNanosecondsToCycles(ui64 ns) {
- return Singleton<THostTimeCalculator>()->EpochNanosecondsToCycles(ns);
- }
- ////////////////////////////////////////////////////////////////////////////////
- template <class TDepot>
- class TLogShuttle: public IShuttle {
- private:
- using TExecutor = TRunLogShuttleActionExecutor<TDepot>;
- TTrackLog TrackLog;
- TExecutor* Executor;
- bool Ignore = false;
- size_t MaxTrackLength;
- TAdaptiveLock Lock;
- TAtomic ForkFailed = 0;
- public:
- explicit TLogShuttle(TExecutor* executor)
- : IShuttle(executor->GetTraceIdx(), executor->NewSpanId())
- , Executor(executor)
- , MaxTrackLength(Executor->GetAction().GetMaxTrackLength() ? Executor->GetAction().GetMaxTrackLength() : 100)
- {
- }
- bool DoAddProbe(TProbe* probe, const TParams& params, ui64 timestamp) override;
- void DoEndOfTrack() override;
- void DoDrop() override;
- void DoSerialize(TShuttleTrace& msg) override;
- bool DoFork(TShuttlePtr& child) override;
- bool DoJoin(const TShuttlePtr& child) override;
- void SetIgnore(bool ignore);
- void Clear();
- const TTrackLog& GetTrackLog() const {
- return TrackLog;
- }
- };
- ////////////////////////////////////////////////////////////////////////////////
- template <class TDepot>
- class TLogShuttleActionBase: public IExecutor {
- private:
- const ui64 TraceIdx;
- public:
- explicit TLogShuttleActionBase(ui64 traceIdx)
- : TraceIdx(traceIdx)
- {
- }
- ui64 GetTraceIdx() const {
- return TraceIdx;
- }
- static TLogShuttle<TDepot>* Cast(const TShuttlePtr& shuttle);
- static TLogShuttle<TDepot>* Cast(IShuttle* shuttle);
- };
- ////////////////////////////////////////////////////////////////////////////////
- template <class TDepot>
- class TRunLogShuttleActionExecutor: public TLogShuttleActionBase<TDepot> {
- private:
- TSpinLock Lock;
- TVector<TShuttlePtr> AllShuttles;
- TVector<TShuttlePtr> Parking;
- TRunLogShuttleAction Action;
- TDepot* Depot;
- TAtomic MissedTracks = 0;
- TAtomic* LastTrackId;
- TAtomic* LastSpanId;
- static constexpr int MaxShuttles = 100000;
- public:
- TRunLogShuttleActionExecutor(ui64 traceIdx, const TRunLogShuttleAction& action, TDepot* depot, TAtomic* lastTrackId, TAtomic* lastSpanId);
- ~TRunLogShuttleActionExecutor();
- bool DoExecute(TOrbit& orbit, const TParams& params) override;
- void RecordShuttle(TLogShuttle<TDepot>* shuttle);
- void ParkShuttle(TLogShuttle<TDepot>* shuttle);
- void DiscardShuttle();
- TShuttlePtr RentShuttle();
- ui64 NewSpanId();
- const TRunLogShuttleAction& GetAction() const {
- return Action;
- }
- };
- ////////////////////////////////////////////////////////////////////////////////
- template <class TDepot>
- class TEditLogShuttleActionExecutor: public TLogShuttleActionBase<TDepot> {
- private:
- TEditLogShuttleAction Action;
- public:
- TEditLogShuttleActionExecutor(ui64 traceIdx, const TEditLogShuttleAction& action);
- bool DoExecute(TOrbit& orbit, const TParams& params) override;
- };
- ////////////////////////////////////////////////////////////////////////////////
- template <class TDepot>
- class TDropLogShuttleActionExecutor: public TLogShuttleActionBase<TDepot> {
- private:
- TDropLogShuttleAction Action;
- public:
- TDropLogShuttleActionExecutor(ui64 traceIdx, const TDropLogShuttleAction& action);
- bool DoExecute(TOrbit& orbit, const TParams& params) override;
- };
- ////////////////////////////////////////////////////////////////////////////////
- template <class TDepot>
- bool TLogShuttle<TDepot>::DoAddProbe(TProbe* probe, const TParams& params, ui64 timestamp) {
- with_lock (Lock) {
- if (TrackLog.Items.size() >= MaxTrackLength) {
- TrackLog.Truncated = true;
- return true;
- }
- TrackLog.Items.emplace_back();
- TTrackLog::TItem* item = &TrackLog.Items.back();
- item->ThreadId = 0; // TODO[serxa]: check if it is fast to run TThread::CurrentThreadId();
- item->Probe = probe;
- if ((item->SavedParamsCount = probe->Event.Signature.ParamCount) > 0) {
- probe->Event.Signature.CloneParams(item->Params, params);
- }
- item->TimestampCycles = timestamp ? timestamp : GetCycleCount();
- }
- return true;
- }
- template <class TDepot>
- void TLogShuttle<TDepot>::DoEndOfTrack() {
- // Record track log if not ignored
- if (!Ignore) {
- if (AtomicGet(ForkFailed)) {
- Executor->DiscardShuttle();
- } else {
- Executor->RecordShuttle(this);
- }
- }
- Executor->ParkShuttle(this);
- }
- template <class TDepot>
- void TLogShuttle<TDepot>::DoDrop() {
- // Do not track log results of dropped shuttles
- Executor->ParkShuttle(this);
- }
- template <class TDepot>
- void TLogShuttle<TDepot>::SetIgnore(bool ignore) {
- Ignore = ignore;
- }
- template <class TDepot>
- void TLogShuttle<TDepot>::Clear() {
- TrackLog.Clear();
- AtomicSet(ForkFailed, 0);
- }
- template <class TDepot>
- void TLogShuttle<TDepot>::DoSerialize(TShuttleTrace& msg)
- {
- with_lock (Lock)
- {
- if (!GetTrackLog().Items.size()) {
- return ;
- }
- for (auto& record : GetTrackLog().Items) {
- auto *rec = msg.AddEvents();
- rec->SetName(record.Probe->Event.Name);
- rec->SetProvider(record.Probe->Event.GetProvider());
- rec->SetTimestampNanosec(
- CyclesToEpochNanoseconds(record.TimestampCycles));
- record.Probe->Event.Signature.SerializeToPb(record.Params, *rec->MutableParams());
- }
- }
- }
- template <class TDepot>
- TLogShuttle<TDepot>* TLogShuttleActionBase<TDepot>::Cast(const TShuttlePtr& shuttle) {
- return static_cast<TLogShuttle<TDepot>*>(shuttle.Get());
- }
- template <class TDepot>
- TLogShuttle<TDepot>* TLogShuttleActionBase<TDepot>::Cast(IShuttle* shuttle) {
- return static_cast<TLogShuttle<TDepot>*>(shuttle);
- }
- ////////////////////////////////////////////////////////////////////////////////
- template <class TDepot>
- TRunLogShuttleActionExecutor<TDepot>::TRunLogShuttleActionExecutor(
- ui64 traceIdx,
- const TRunLogShuttleAction& action,
- TDepot* depot,
- TAtomic* lastTrackId,
- TAtomic* lastSpanId)
- : TLogShuttleActionBase<TDepot>(traceIdx)
- , Action(action)
- , Depot(depot)
- , LastTrackId(lastTrackId)
- , LastSpanId(lastSpanId)
- {
- ui64 size = Min<ui64>(Action.GetShuttlesCount() ? Action.GetShuttlesCount() : 1000, MaxShuttles); // Do not allow to allocate too much memory
- AllShuttles.reserve(size);
- Parking.reserve(size);
- for (ui64 i = 0; i < size; i++) {
- TShuttlePtr shuttle(new TLogShuttle<TDepot>(this));
- AllShuttles.emplace_back(shuttle);
- Parking.emplace_back(shuttle);
- }
- }
- template <class TDepot>
- TRunLogShuttleActionExecutor<TDepot>::~TRunLogShuttleActionExecutor() {
- for (TShuttlePtr& shuttle : AllShuttles) {
- shuttle->Kill();
- }
- }
- template <class TDepot>
- bool TRunLogShuttleActionExecutor<TDepot>::DoExecute(TOrbit& orbit, const TParams& params) {
- Y_UNUSED(params);
- if (TShuttlePtr shuttle = RentShuttle()) {
- this->Cast(shuttle)->SetIgnore(Action.GetIgnore());
- orbit.AddShuttle(shuttle);
- } else {
- AtomicIncrement(MissedTracks);
- }
- return true;
- }
- template <class TDepot>
- void TRunLogShuttleActionExecutor<TDepot>::DiscardShuttle() {
- AtomicIncrement(MissedTracks);
- }
- template <class TDepot>
- void TRunLogShuttleActionExecutor<TDepot>::RecordShuttle(TLogShuttle<TDepot>* shuttle) {
- if (Depot == nullptr) {
- return;
- }
- typename TDepot::TAccessor a(*Depot);
- if (TTrackLog* trackLog = a.Add()) {
- *trackLog = shuttle->GetTrackLog();
- trackLog->Id = AtomicIncrement(*LastTrackId); // Track id is assigned at reporting time
- }
- }
- template <class TDepot>
- TShuttlePtr TRunLogShuttleActionExecutor<TDepot>::RentShuttle() {
- TGuard<TSpinLock> g(Lock);
- if (Parking.empty()) {
- return TShuttlePtr();
- } else {
- TShuttlePtr shuttle = Parking.back();
- Parking.pop_back();
- return shuttle;
- }
- }
- template <class TDepot>
- void TRunLogShuttleActionExecutor<TDepot>::ParkShuttle(TLogShuttle<TDepot>* shuttle) {
- shuttle->Clear();
- TGuard<TSpinLock> g(Lock);
- Parking.emplace_back(shuttle);
- }
- template <class TDepot>
- ui64 TRunLogShuttleActionExecutor<TDepot>::NewSpanId()
- {
- return LastSpanId ? AtomicIncrement(*LastSpanId) : 0;
- }
- ////////////////////////////////////////////////////////////////////////////////
- template <class TDepot>
- TEditLogShuttleActionExecutor<TDepot>::TEditLogShuttleActionExecutor(ui64 traceIdx, const TEditLogShuttleAction& action)
- : TLogShuttleActionBase<TDepot>(traceIdx)
- , Action(action)
- {
- }
- template <class TDepot>
- bool TEditLogShuttleActionExecutor<TDepot>::DoExecute(TOrbit& orbit, const TParams& params) {
- Y_UNUSED(params);
- bool ignore = Action.GetIgnore();
- orbit.ForEachShuttle(this->GetTraceIdx(), [=](IShuttle* shuttle) {
- this->Cast(shuttle)->SetIgnore(ignore);
- return true;
- });
- return true;
- }
- ////////////////////////////////////////////////////////////////////////////////
- template <class TDepot>
- TDropLogShuttleActionExecutor<TDepot>::TDropLogShuttleActionExecutor(ui64 traceIdx, const TDropLogShuttleAction& action)
- : TLogShuttleActionBase<TDepot>(traceIdx)
- , Action(action)
- {
- }
- template <class TDepot>
- bool TDropLogShuttleActionExecutor<TDepot>::DoExecute(TOrbit& orbit, const TParams& params) {
- Y_UNUSED(params);
- orbit.ForEachShuttle(this->GetTraceIdx(), [](IShuttle*) {
- return false; // Erase shuttle from orbit
- });
- return true;
- }
- }
|