#pragma once #include "custom_action.h" #include "event.h" #include "log.h" #include "log_shuttle.h" #include "probe.h" #include #include #include #include #include #include #include #include namespace NLWTrace { using TProbeMap = THashMap, TProbe*>; // Interface for probe ownership management class IBox: public virtual TThrRefBase { private: bool Owns; public: explicit IBox(bool ownsProbe = false) : Owns(ownsProbe) { } bool OwnsProbe() { return Owns; } virtual TProbe* GetProbe() = 0; }; using TBoxPtr = TIntrusivePtr; // Simple not-owning box, that just holds pointer to static/global probe (e.g. created by LWTRACE_DEFINE_PROVIDER) class TStaticBox: public IBox { private: TProbe* Probe; public: explicit TStaticBox(TProbe* probe) : IBox(false) , Probe(probe) { } TProbe* GetProbe() override { return Probe; } }; // Just a set of unique probes // TODO[serxa]: get rid of different ProbeRegistries, use unique one (singleton) with auto registration class TProbeRegistry: public TNonCopyable { private: TMutex Mutex; // Probe* pointer uniquely identifies a probe and boxptr actually owns TProbe object (if required) using TProbes = THashMap; TProbes Probes; // Probe provider-name pairs must be unique, keep track of them using TIds = TSet>; TIds Ids; public: // Add probes from null-terminated array of probe pointers. Probe can be added multiple times. Thread-safe. // Implies probes you pass will live forever (e.g. created by LWTRACE_DEFINE_PROVIDER) void AddProbesList(TProbe** reg); // Manage probes that are created/destructed dynamically void AddProbe(const TBoxPtr& box); void RemoveProbe(TProbe* probe); // Helper class to make thread-safe iteration over probes class TProbesAccessor { private: TGuard Guard; TProbes& Probes; public: explicit TProbesAccessor(TProbeRegistry* registry) : Guard(registry->Mutex) , Probes(registry->Probes) {} explicit TProbesAccessor(TProbeRegistry& registry) : TProbesAccessor(®istry) {} auto begin() { return Probes.begin(); } auto end() { return Probes.end(); } }; friend class TProbesAccessor; private: void AddProbeNoLock(const TBoxPtr& box); void RemoveProbeNoLock(TProbe* probe); }; // Represents a compiled trace query, holds executors attached to probes class TSession: public TNonCopyable { public: typedef THashMap TTraceVariables; private: const TInstant StartTime; const ui64 TraceIdx; TProbeRegistry& Registry; TDuration StoreDuration; TDuration ReadDuration; TCyclicLog CyclicLog; TDurationLog DurationLog; TCyclicDepot CyclicDepot; TDurationDepot DurationDepot; TAtomic LastTrackId; TAtomic LastSpanId; typedef TVector> TProbes; TProbes Probes; bool Attached; NLWTrace::TQuery Query; TTraceVariables TraceVariables; TTraceResources TraceResources; void InsertExecutor(TTraceVariables& traceVariables, size_t bi, const NLWTrace::TPredicate* pred, const google::protobuf::RepeatedPtrField& actions, TProbe* probe, const bool destructiveActionsAllowed, const TCustomActionFactory& customActionFactory); private: void Destroy(); public: TSession(ui64 traceIdx, TProbeRegistry& registry, const NLWTrace::TQuery& query, const bool destructiveActionsAllowed, const TCustomActionFactory& customActionFactory); ~TSession(); void Detach(); size_t GetEventsCount() const; size_t GetThreadsCount() const; const NLWTrace::TQuery& GetQuery() const { return Query; } TInstant GetStartTime() const { return StartTime; } ui64 GetTraceIdx() const { return TraceIdx; } TTraceResources& Resources() { return TraceResources; } const TTraceResources& Resources() const { return TraceResources; } template void ReadThreads(TReader& r) const { CyclicLog.ReadThreads(r); DurationLog.ReadThreads(r); } template void ReadItems(TReader& r) const { CyclicLog.ReadItems(r); DurationLog.ReadItems(GetCycleCount(), DurationToCycles(ReadDuration), r); } template void ReadItems(ui64 now, ui64 duration, TReader& r) const { CyclicLog.ReadItems(r); DurationLog.ReadItems(now, duration, r); } template void ReadDepotThreads(TReader& r) const { CyclicDepot.ReadThreads(r); DurationDepot.ReadThreads(r); } template void ReadDepotItems(TReader& r) const { CyclicDepot.ReadItems(r); DurationDepot.ReadItems(GetCycleCount(), DurationToCycles(ReadDuration), r); } template void ReadDepotItems(ui64 now, ui64 duration, TReader& r) const { CyclicDepot.ReadItems(r); DurationDepot.ReadItems(now, duration, r); } template void ExtractItemsFromCyclicDepot(TReader& r) const { CyclicDepot.ExtractItems(r); } void ToProtobuf(TLogPb& pb) const; }; // Deserialization result. // Either IsSuccess is true or FailedEventNames contains event names // we were not able to deserialize. struct TTraceDeserializeStatus { bool IsSuccess = true; TVector FailedEventNames; void AddFailedEventName(const TString& name) { IsSuccess = false; FailedEventNames.emplace_back(name); } }; // Just a registry of all active trace queries // Facade for all interactions with probes/traces class TManager: public TNonCopyable { private: TProbeRegistry& Registry; TMutex Mtx; ui64 LastTraceIdx = 1; typedef THashMap TTraces; // traceId -> TSession TTraces Traces; bool DestructiveActionsAllowed; TCustomActionFactory CustomActionFactory; THolder> SerializingExecutor; public: static constexpr ui64 RemoteTraceIdx = 0; public: TManager(TProbeRegistry& registry, bool allowDestructiveActions); ~TManager(); bool HasTrace(const TString& id) const; const TSession* GetTrace(const TString& id) const; void New(const TString& id, const NLWTrace::TQuery& query); void Delete(const TString& id); void Stop(const TString& id); template void ReadProbes(TReader& reader) const { TProbeRegistry::TProbesAccessor probes(Registry); for (auto& kv : probes) { TProbe* probe = kv.first; reader.Push(probe); } } template void ReadTraces(TReader& reader) const { TGuard g(Mtx); for (const auto& Trace : Traces) { const TString& id = Trace.first; const TSession* trace = Trace.second; reader.Push(id, trace); } } template void ReadLog(const TString& id, TReader& reader) { TGuard g(Mtx); TTraces::iterator it = Traces.find(id); if (it == Traces.end()) { ythrow yexception() << "trace id '" << id << "' is not used"; } else { TSession* trace = it->second; trace->ReadItems(reader); } } template void ReadLog(const TString& id, ui64 now, TReader& reader) { TGuard g(Mtx); TTraces::iterator it = Traces.find(id); if (it == Traces.end()) { ythrow yexception() << "trace id '" << id << "' is not used"; } else { TSession* trace = it->second; trace->ReadItems(now, reader); } } template void ReadDepot(const TString& id, TReader& reader) { TGuard g(Mtx); TTraces::iterator it = Traces.find(id); if (it == Traces.end()) { ythrow yexception() << "trace id '" << id << "' is not used"; } else { TSession* trace = it->second; trace->ReadDepotItems(reader); } } template void ReadDepot(const TString& id, ui64 now, TReader& reader) { TGuard g(Mtx); TTraces::iterator it = Traces.find(id); if (it == Traces.end()) { ythrow yexception() << "trace id '" << id << "' is not used"; } else { TSession* trace = it->second; trace->ReadDepotItems(now, reader); } } template void ExtractItemsFromCyclicDepot(const TString& id, TReader& reader) { TGuard g(Mtx); TTraces::iterator it = Traces.find(id); if (it == Traces.end()) { ythrow yexception() << "trace id '" << id << "' is not used"; } else { TSession* trace = it->second; trace->ExtractItemsFromCyclicDepot(reader); } } bool GetDestructiveActionsAllowed() { return DestructiveActionsAllowed; } void RegisterCustomAction(const TString& name, const TCustomActionFactory::TCallback& callback) { CustomActionFactory.Register(name, callback); } template void RegisterCustomAction() { CustomActionFactory.Register(T::GetActionName(), [=](TProbe* probe, const TCustomAction& action, TSession* trace) { return new T(probe, action, trace); }); } TProbeMap GetProbesMap(); void CreateTraceRequest(TTraceRequest& msg, TOrbit& orbit); bool HandleTraceRequest( const TTraceRequest& msg, TOrbit& orbit); TTraceDeserializeStatus HandleTraceResponse( const TTraceResponse& msg, const TProbeMap& probesMap, TOrbit& orbit, i64 timeOffset = 0, double timeScale = 1); void CreateTraceResponse( TTraceResponse& msg, TOrbit& orbit); bool IsTraced(TOrbit& orbit) { return orbit.HasShuttle(TManager::RemoteTraceIdx); } }; }