control.h 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351
  1. #pragma once
  2. #include "custom_action.h"
  3. #include "event.h"
  4. #include "log.h"
  5. #include "log_shuttle.h"
  6. #include "probe.h"
  7. #include <library/cpp/lwtrace/protos/lwtrace.pb.h>
  8. #include <google/protobuf/repeated_field.h>
  9. #include <util/generic/deque.h>
  10. #include <util/generic/hash.h>
  11. #include <util/generic/noncopyable.h>
  12. #include <util/generic/ptr.h>
  13. #include <util/generic/set.h>
  14. #include <util/generic/vector.h>
  15. namespace NLWTrace {
  16. using TProbeMap = THashMap<std::pair<TString, TString>, TProbe*>;
  17. // Interface for probe ownership management
  18. class IBox: public virtual TThrRefBase {
  19. private:
  20. bool Owns;
  21. public:
  22. explicit IBox(bool ownsProbe = false)
  23. : Owns(ownsProbe)
  24. {
  25. }
  26. bool OwnsProbe() {
  27. return Owns;
  28. }
  29. virtual TProbe* GetProbe() = 0;
  30. };
  31. using TBoxPtr = TIntrusivePtr<IBox>;
  32. // Simple not-owning box, that just holds pointer to static/global probe (e.g. created by LWTRACE_DEFINE_PROVIDER)
  33. class TStaticBox: public IBox {
  34. private:
  35. TProbe* Probe;
  36. public:
  37. explicit TStaticBox(TProbe* probe)
  38. : IBox(false)
  39. , Probe(probe)
  40. {
  41. }
  42. TProbe* GetProbe() override {
  43. return Probe;
  44. }
  45. };
  46. // Just a set of unique probes
  47. // TODO[serxa]: get rid of different ProbeRegistries, use unique one (singleton) with auto registration
  48. class TProbeRegistry: public TNonCopyable {
  49. private:
  50. TMutex Mutex;
  51. // Probe* pointer uniquely identifies a probe and boxptr actually owns TProbe object (if required)
  52. using TProbes = THashMap<TProbe*, TBoxPtr>;
  53. TProbes Probes;
  54. // Probe provider-name pairs must be unique, keep track of them
  55. using TIds = TSet<std::pair<TString, TString>>;
  56. TIds Ids;
  57. public:
  58. // Add probes from null-terminated array of probe pointers. Probe can be added multiple times. Thread-safe.
  59. // Implies probes you pass will live forever (e.g. created by LWTRACE_DEFINE_PROVIDER)
  60. void AddProbesList(TProbe** reg);
  61. // Manage probes that are created/destructed dynamically
  62. void AddProbe(const TBoxPtr& box);
  63. void RemoveProbe(TProbe* probe);
  64. // Helper class to make thread-safe iteration over probes
  65. class TProbesAccessor {
  66. private:
  67. TGuard<TMutex> Guard;
  68. TProbes& Probes;
  69. public:
  70. explicit TProbesAccessor(TProbeRegistry* registry)
  71. : Guard(registry->Mutex)
  72. , Probes(registry->Probes)
  73. {}
  74. explicit TProbesAccessor(TProbeRegistry& registry)
  75. : TProbesAccessor(&registry)
  76. {}
  77. auto begin() { return Probes.begin(); }
  78. auto end() { return Probes.end(); }
  79. };
  80. friend class TProbesAccessor;
  81. private:
  82. void AddProbeNoLock(const TBoxPtr& box);
  83. void RemoveProbeNoLock(TProbe* probe);
  84. };
  85. // Represents a compiled trace query, holds executors attached to probes
  86. class TSession: public TNonCopyable {
  87. public:
  88. typedef THashMap<TString, TAtomicBase> TTraceVariables;
  89. private:
  90. const TInstant StartTime;
  91. const ui64 TraceIdx;
  92. TProbeRegistry& Registry;
  93. TDuration StoreDuration;
  94. TDuration ReadDuration;
  95. TCyclicLog CyclicLog;
  96. TDurationLog DurationLog;
  97. TCyclicDepot CyclicDepot;
  98. TDurationDepot DurationDepot;
  99. TAtomic LastTrackId;
  100. TAtomic LastSpanId;
  101. typedef TVector<std::pair<TProbe*, IExecutor*>> TProbes;
  102. TProbes Probes;
  103. bool Attached;
  104. NLWTrace::TQuery Query;
  105. TTraceVariables TraceVariables;
  106. TTraceResources TraceResources;
  107. void InsertExecutor(TTraceVariables& traceVariables,
  108. size_t bi, const NLWTrace::TPredicate* pred,
  109. const google::protobuf::RepeatedPtrField<NLWTrace::TAction>& actions,
  110. TProbe* probe, const bool destructiveActionsAllowed,
  111. const TCustomActionFactory& customActionFactory);
  112. private:
  113. void Destroy();
  114. public:
  115. TSession(ui64 traceIdx,
  116. TProbeRegistry& registry,
  117. const NLWTrace::TQuery& query,
  118. const bool destructiveActionsAllowed,
  119. const TCustomActionFactory& customActionFactory);
  120. ~TSession();
  121. void Detach();
  122. size_t GetEventsCount() const;
  123. size_t GetThreadsCount() const;
  124. const NLWTrace::TQuery& GetQuery() const {
  125. return Query;
  126. }
  127. TInstant GetStartTime() const {
  128. return StartTime;
  129. }
  130. ui64 GetTraceIdx() const {
  131. return TraceIdx;
  132. }
  133. TTraceResources& Resources() {
  134. return TraceResources;
  135. }
  136. const TTraceResources& Resources() const {
  137. return TraceResources;
  138. }
  139. template <class TReader>
  140. void ReadThreads(TReader& r) const {
  141. CyclicLog.ReadThreads(r);
  142. DurationLog.ReadThreads(r);
  143. }
  144. template <class TReader>
  145. void ReadItems(TReader& r) const {
  146. CyclicLog.ReadItems(r);
  147. DurationLog.ReadItems(GetCycleCount(), DurationToCycles(ReadDuration), r);
  148. }
  149. template <class TReader>
  150. void ReadItems(ui64 now, ui64 duration, TReader& r) const {
  151. CyclicLog.ReadItems(r);
  152. DurationLog.ReadItems(now, duration, r);
  153. }
  154. template <class TReader>
  155. void ReadDepotThreads(TReader& r) const {
  156. CyclicDepot.ReadThreads(r);
  157. DurationDepot.ReadThreads(r);
  158. }
  159. template <class TReader>
  160. void ReadDepotItems(TReader& r) const {
  161. CyclicDepot.ReadItems(r);
  162. DurationDepot.ReadItems(GetCycleCount(), DurationToCycles(ReadDuration), r);
  163. }
  164. template <class TReader>
  165. void ReadDepotItems(ui64 now, ui64 duration, TReader& r) const {
  166. CyclicDepot.ReadItems(r);
  167. DurationDepot.ReadItems(now, duration, r);
  168. }
  169. void ToProtobuf(TLogPb& pb) const;
  170. };
  171. // Deserialization result.
  172. // Either IsSuccess is true or FailedEventNames contains event names
  173. // we were not able to deserialize.
  174. struct TTraceDeserializeStatus
  175. {
  176. bool IsSuccess = true;
  177. TVector<TString> FailedEventNames;
  178. void AddFailedEventName(const TString& name)
  179. {
  180. IsSuccess = false;
  181. FailedEventNames.emplace_back(name);
  182. }
  183. };
  184. // Just a registry of all active trace queries
  185. // Facade for all interactions with probes/traces
  186. class TManager: public TNonCopyable {
  187. private:
  188. TProbeRegistry& Registry;
  189. TMutex Mtx;
  190. ui64 LastTraceIdx = 1;
  191. typedef THashMap<TString, TSession*> TTraces; // traceId -> TSession
  192. TTraces Traces;
  193. bool DestructiveActionsAllowed;
  194. TCustomActionFactory CustomActionFactory;
  195. THolder<TRunLogShuttleActionExecutor<TCyclicDepot>> SerializingExecutor;
  196. public:
  197. static constexpr ui64 RemoteTraceIdx = 0;
  198. public:
  199. TManager(TProbeRegistry& registry, bool allowDestructiveActions);
  200. ~TManager();
  201. bool HasTrace(const TString& id) const;
  202. const TSession* GetTrace(const TString& id) const;
  203. void New(const TString& id, const NLWTrace::TQuery& query);
  204. void Delete(const TString& id);
  205. void Stop(const TString& id);
  206. template <class TReader>
  207. void ReadProbes(TReader& reader) const {
  208. TProbeRegistry::TProbesAccessor probes(Registry);
  209. for (auto& kv : probes) {
  210. TProbe* probe = kv.first;
  211. reader.Push(probe);
  212. }
  213. }
  214. template <class TReader>
  215. void ReadTraces(TReader& reader) const {
  216. TGuard<TMutex> g(Mtx);
  217. for (const auto& Trace : Traces) {
  218. const TString& id = Trace.first;
  219. const TSession* trace = Trace.second;
  220. reader.Push(id, trace);
  221. }
  222. }
  223. template <class TReader>
  224. void ReadLog(const TString& id, TReader& reader) {
  225. TGuard<TMutex> g(Mtx);
  226. TTraces::iterator it = Traces.find(id);
  227. if (it == Traces.end()) {
  228. ythrow yexception() << "trace id '" << id << "' is not used";
  229. } else {
  230. TSession* trace = it->second;
  231. trace->ReadItems(reader);
  232. }
  233. }
  234. template <class TReader>
  235. void ReadLog(const TString& id, ui64 now, TReader& reader) {
  236. TGuard<TMutex> g(Mtx);
  237. TTraces::iterator it = Traces.find(id);
  238. if (it == Traces.end()) {
  239. ythrow yexception() << "trace id '" << id << "' is not used";
  240. } else {
  241. TSession* trace = it->second;
  242. trace->ReadItems(now, reader);
  243. }
  244. }
  245. template <class TReader>
  246. void ReadDepot(const TString& id, TReader& reader) {
  247. TGuard<TMutex> g(Mtx);
  248. TTraces::iterator it = Traces.find(id);
  249. if (it == Traces.end()) {
  250. ythrow yexception() << "trace id '" << id << "' is not used";
  251. } else {
  252. TSession* trace = it->second;
  253. trace->ReadDepotItems(reader);
  254. }
  255. }
  256. template <class TReader>
  257. void ReadDepot(const TString& id, ui64 now, TReader& reader) {
  258. TGuard<TMutex> g(Mtx);
  259. TTraces::iterator it = Traces.find(id);
  260. if (it == Traces.end()) {
  261. ythrow yexception() << "trace id '" << id << "' is not used";
  262. } else {
  263. TSession* trace = it->second;
  264. trace->ReadDepotItems(now, reader);
  265. }
  266. }
  267. bool GetDestructiveActionsAllowed() {
  268. return DestructiveActionsAllowed;
  269. }
  270. void RegisterCustomAction(const TString& name, const TCustomActionFactory::TCallback& callback) {
  271. CustomActionFactory.Register(name, callback);
  272. }
  273. template <class T>
  274. void RegisterCustomAction() {
  275. CustomActionFactory.Register(T::GetActionName(), [=](TProbe* probe, const TCustomAction& action, TSession* trace) {
  276. return new T(probe, action, trace);
  277. });
  278. }
  279. TProbeMap GetProbesMap();
  280. void CreateTraceRequest(TTraceRequest& msg, TOrbit& orbit);
  281. bool HandleTraceRequest(
  282. const TTraceRequest& msg,
  283. TOrbit& orbit);
  284. TTraceDeserializeStatus HandleTraceResponse(
  285. const TTraceResponse& msg,
  286. const TProbeMap& probesMap,
  287. TOrbit& orbit,
  288. i64 timeOffset = 0,
  289. double timeScale = 1);
  290. void CreateTraceResponse(
  291. TTraceResponse& msg,
  292. TOrbit& orbit);
  293. bool IsTraced(TOrbit& orbit) {
  294. return orbit.HasShuttle(TManager::RemoteTraceIdx);
  295. }
  296. };
  297. }