control.h 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369
  1. #pragma once
  2. #include "custom_action.h"
  3. #include "event.h"
  4. #include "log.h"
  5. #include "log_shuttle.h"
  6. #include "probe.h"
  7. #include <library/cpp/lwtrace/protos/lwtrace.pb.h>
  8. #include <google/protobuf/repeated_field.h>
  9. #include <util/generic/deque.h>
  10. #include <util/generic/hash.h>
  11. #include <util/generic/noncopyable.h>
  12. #include <util/generic/ptr.h>
  13. #include <util/generic/set.h>
  14. #include <util/generic/vector.h>
  15. namespace NLWTrace {
  16. using TProbeMap = THashMap<std::pair<TString, TString>, TProbe*>;
  17. // Interface for probe ownership management
  18. class IBox: public virtual TThrRefBase {
  19. private:
  20. bool Owns;
  21. public:
  22. explicit IBox(bool ownsProbe = false)
  23. : Owns(ownsProbe)
  24. {
  25. }
  26. bool OwnsProbe() {
  27. return Owns;
  28. }
  29. virtual TProbe* GetProbe() = 0;
  30. };
  31. using TBoxPtr = TIntrusivePtr<IBox>;
  32. // Simple not-owning box, that just holds pointer to static/global probe (e.g. created by LWTRACE_DEFINE_PROVIDER)
  33. class TStaticBox: public IBox {
  34. private:
  35. TProbe* Probe;
  36. public:
  37. explicit TStaticBox(TProbe* probe)
  38. : IBox(false)
  39. , Probe(probe)
  40. {
  41. }
  42. TProbe* GetProbe() override {
  43. return Probe;
  44. }
  45. };
  46. // Just a set of unique probes
  47. // TODO[serxa]: get rid of different ProbeRegistries, use unique one (singleton) with auto registration
  48. class TProbeRegistry: public TNonCopyable {
  49. private:
  50. TMutex Mutex;
  51. // Probe* pointer uniquely identifies a probe and boxptr actually owns TProbe object (if required)
  52. using TProbes = THashMap<TProbe*, TBoxPtr>;
  53. TProbes Probes;
  54. // Probe provider-name pairs must be unique, keep track of them
  55. using TIds = TSet<std::pair<TString, TString>>;
  56. TIds Ids;
  57. public:
  58. // Add probes from null-terminated array of probe pointers. Probe can be added multiple times. Thread-safe.
  59. // Implies probes you pass will live forever (e.g. created by LWTRACE_DEFINE_PROVIDER)
  60. void AddProbesList(TProbe** reg);
  61. // Manage probes that are created/destructed dynamically
  62. void AddProbe(const TBoxPtr& box);
  63. void RemoveProbe(TProbe* probe);
  64. // Helper class to make thread-safe iteration over probes
  65. class TProbesAccessor {
  66. private:
  67. TGuard<TMutex> Guard;
  68. TProbes& Probes;
  69. public:
  70. explicit TProbesAccessor(TProbeRegistry* registry)
  71. : Guard(registry->Mutex)
  72. , Probes(registry->Probes)
  73. {}
  74. explicit TProbesAccessor(TProbeRegistry& registry)
  75. : TProbesAccessor(&registry)
  76. {}
  77. auto begin() { return Probes.begin(); }
  78. auto end() { return Probes.end(); }
  79. };
  80. friend class TProbesAccessor;
  81. private:
  82. void AddProbeNoLock(const TBoxPtr& box);
  83. void RemoveProbeNoLock(TProbe* probe);
  84. };
  85. // Represents a compiled trace query, holds executors attached to probes
  86. class TSession: public TNonCopyable {
  87. public:
  88. typedef THashMap<TString, TAtomicBase> TTraceVariables;
  89. private:
  90. const TInstant StartTime;
  91. const ui64 TraceIdx;
  92. TProbeRegistry& Registry;
  93. TDuration StoreDuration;
  94. TDuration ReadDuration;
  95. TCyclicLog CyclicLog;
  96. TDurationLog DurationLog;
  97. TCyclicDepot CyclicDepot;
  98. TDurationDepot DurationDepot;
  99. TAtomic LastTrackId;
  100. TAtomic LastSpanId;
  101. typedef TVector<std::pair<TProbe*, IExecutor*>> TProbes;
  102. TProbes Probes;
  103. bool Attached;
  104. NLWTrace::TQuery Query;
  105. TTraceVariables TraceVariables;
  106. TTraceResources TraceResources;
  107. void InsertExecutor(TTraceVariables& traceVariables,
  108. size_t bi, const NLWTrace::TPredicate* pred,
  109. const google::protobuf::RepeatedPtrField<NLWTrace::TAction>& actions,
  110. TProbe* probe, const bool destructiveActionsAllowed,
  111. const TCustomActionFactory& customActionFactory);
  112. private:
  113. void Destroy();
  114. public:
  115. TSession(ui64 traceIdx,
  116. TProbeRegistry& registry,
  117. const NLWTrace::TQuery& query,
  118. const bool destructiveActionsAllowed,
  119. const TCustomActionFactory& customActionFactory);
  120. ~TSession();
  121. void Detach();
  122. size_t GetEventsCount() const;
  123. size_t GetThreadsCount() const;
  124. const NLWTrace::TQuery& GetQuery() const {
  125. return Query;
  126. }
  127. TInstant GetStartTime() const {
  128. return StartTime;
  129. }
  130. ui64 GetTraceIdx() const {
  131. return TraceIdx;
  132. }
  133. TTraceResources& Resources() {
  134. return TraceResources;
  135. }
  136. const TTraceResources& Resources() const {
  137. return TraceResources;
  138. }
  139. template <class TReader>
  140. void ReadThreads(TReader& r) const {
  141. CyclicLog.ReadThreads(r);
  142. DurationLog.ReadThreads(r);
  143. }
  144. template <class TReader>
  145. void ReadItems(TReader& r) const {
  146. CyclicLog.ReadItems(r);
  147. DurationLog.ReadItems(GetCycleCount(), DurationToCycles(ReadDuration), r);
  148. }
  149. template <class TReader>
  150. void ReadItems(ui64 now, ui64 duration, TReader& r) const {
  151. CyclicLog.ReadItems(r);
  152. DurationLog.ReadItems(now, duration, r);
  153. }
  154. template <class TReader>
  155. void ReadDepotThreads(TReader& r) const {
  156. CyclicDepot.ReadThreads(r);
  157. DurationDepot.ReadThreads(r);
  158. }
  159. template <class TReader>
  160. void ReadDepotItems(TReader& r) const {
  161. CyclicDepot.ReadItems(r);
  162. DurationDepot.ReadItems(GetCycleCount(), DurationToCycles(ReadDuration), r);
  163. }
  164. template <class TReader>
  165. void ReadDepotItems(ui64 now, ui64 duration, TReader& r) const {
  166. CyclicDepot.ReadItems(r);
  167. DurationDepot.ReadItems(now, duration, r);
  168. }
  169. template <class TReader>
  170. void ExtractItemsFromCyclicDepot(TReader& r) const {
  171. CyclicDepot.ExtractItems(r);
  172. }
  173. void ToProtobuf(TLogPb& pb) const;
  174. };
  175. // Deserialization result.
  176. // Either IsSuccess is true or FailedEventNames contains event names
  177. // we were not able to deserialize.
  178. struct TTraceDeserializeStatus
  179. {
  180. bool IsSuccess = true;
  181. TVector<TString> FailedEventNames;
  182. void AddFailedEventName(const TString& name)
  183. {
  184. IsSuccess = false;
  185. FailedEventNames.emplace_back(name);
  186. }
  187. };
  188. // Just a registry of all active trace queries
  189. // Facade for all interactions with probes/traces
  190. class TManager: public TNonCopyable {
  191. private:
  192. TProbeRegistry& Registry;
  193. TMutex Mtx;
  194. ui64 LastTraceIdx = 1;
  195. typedef THashMap<TString, TSession*> TTraces; // traceId -> TSession
  196. TTraces Traces;
  197. bool DestructiveActionsAllowed;
  198. TCustomActionFactory CustomActionFactory;
  199. THolder<TRunLogShuttleActionExecutor<TCyclicDepot>> SerializingExecutor;
  200. public:
  201. static constexpr ui64 RemoteTraceIdx = 0;
  202. public:
  203. TManager(TProbeRegistry& registry, bool allowDestructiveActions);
  204. ~TManager();
  205. bool HasTrace(const TString& id) const;
  206. const TSession* GetTrace(const TString& id) const;
  207. void New(const TString& id, const NLWTrace::TQuery& query);
  208. void Delete(const TString& id);
  209. void Stop(const TString& id);
  210. template <class TReader>
  211. void ReadProbes(TReader& reader) const {
  212. TProbeRegistry::TProbesAccessor probes(Registry);
  213. for (auto& kv : probes) {
  214. TProbe* probe = kv.first;
  215. reader.Push(probe);
  216. }
  217. }
  218. template <class TReader>
  219. void ReadTraces(TReader& reader) const {
  220. TGuard<TMutex> g(Mtx);
  221. for (const auto& Trace : Traces) {
  222. const TString& id = Trace.first;
  223. const TSession* trace = Trace.second;
  224. reader.Push(id, trace);
  225. }
  226. }
  227. template <class TReader>
  228. void ReadLog(const TString& id, TReader& reader) {
  229. TGuard<TMutex> g(Mtx);
  230. TTraces::iterator it = Traces.find(id);
  231. if (it == Traces.end()) {
  232. ythrow yexception() << "trace id '" << id << "' is not used";
  233. } else {
  234. TSession* trace = it->second;
  235. trace->ReadItems(reader);
  236. }
  237. }
  238. template <class TReader>
  239. void ReadLog(const TString& id, ui64 now, TReader& reader) {
  240. TGuard<TMutex> g(Mtx);
  241. TTraces::iterator it = Traces.find(id);
  242. if (it == Traces.end()) {
  243. ythrow yexception() << "trace id '" << id << "' is not used";
  244. } else {
  245. TSession* trace = it->second;
  246. trace->ReadItems(now, reader);
  247. }
  248. }
  249. template <class TReader>
  250. void ReadDepot(const TString& id, TReader& reader) {
  251. TGuard<TMutex> g(Mtx);
  252. TTraces::iterator it = Traces.find(id);
  253. if (it == Traces.end()) {
  254. ythrow yexception() << "trace id '" << id << "' is not used";
  255. } else {
  256. TSession* trace = it->second;
  257. trace->ReadDepotItems(reader);
  258. }
  259. }
  260. template <class TReader>
  261. void ReadDepot(const TString& id, ui64 now, TReader& reader) {
  262. TGuard<TMutex> g(Mtx);
  263. TTraces::iterator it = Traces.find(id);
  264. if (it == Traces.end()) {
  265. ythrow yexception() << "trace id '" << id << "' is not used";
  266. } else {
  267. TSession* trace = it->second;
  268. trace->ReadDepotItems(now, reader);
  269. }
  270. }
  271. template <class TReader>
  272. void ExtractItemsFromCyclicDepot(const TString& id, TReader& reader) {
  273. TGuard<TMutex> g(Mtx);
  274. TTraces::iterator it = Traces.find(id);
  275. if (it == Traces.end()) {
  276. ythrow yexception() << "trace id '" << id << "' is not used";
  277. } else {
  278. TSession* trace = it->second;
  279. trace->ExtractItemsFromCyclicDepot(reader);
  280. }
  281. }
  282. bool GetDestructiveActionsAllowed() {
  283. return DestructiveActionsAllowed;
  284. }
  285. void RegisterCustomAction(const TString& name, const TCustomActionFactory::TCallback& callback) {
  286. CustomActionFactory.Register(name, callback);
  287. }
  288. template <class T>
  289. void RegisterCustomAction() {
  290. CustomActionFactory.Register(T::GetActionName(), [=](TProbe* probe, const TCustomAction& action, TSession* trace) {
  291. return new T(probe, action, trace);
  292. });
  293. }
  294. TProbeMap GetProbesMap();
  295. void CreateTraceRequest(TTraceRequest& msg, TOrbit& orbit);
  296. bool HandleTraceRequest(
  297. const TTraceRequest& msg,
  298. TOrbit& orbit);
  299. TTraceDeserializeStatus HandleTraceResponse(
  300. const TTraceResponse& msg,
  301. const TProbeMap& probesMap,
  302. TOrbit& orbit,
  303. i64 timeOffset = 0,
  304. double timeScale = 1);
  305. void CreateTraceResponse(
  306. TTraceResponse& msg,
  307. TOrbit& orbit);
  308. bool IsTraced(TOrbit& orbit) {
  309. return orbit.HasShuttle(TManager::RemoteTraceIdx);
  310. }
  311. };
  312. }