trace.cpp 41 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051
  1. #include "all.h"
  2. #include "kill_action.h"
  3. #include "log_shuttle.h"
  4. #include "preprocessor.h"
  5. #include "sleep_action.h"
  6. #include "stderr_writer.h"
  7. #include "google/protobuf/repeated_field.h"
  8. #include <util/generic/map.h>
  9. #include <util/random/random.h>
  10. #include <functional>
  11. namespace NLWTrace {
  12. #ifndef LWTRACE_DISABLE
  13. // Define static strings for name of each parameter type
  14. #define FOREACH_PARAMTYPE_MACRO(n, t, v) \
  15. const char* TParamType<t>::NameString = n; \
  16. /**/
  17. FOREACH_PARAMTYPE(FOREACH_PARAMTYPE_MACRO)
  18. FOR_NIL_PARAMTYPE(FOREACH_PARAMTYPE_MACRO)
  19. #undef FOREACH_PARAMTYPE_MACRO
  20. #endif
  21. void TProbeRegistry::AddProbesList(TProbe** reg) {
  22. TGuard<TMutex> g(Mutex);
  23. if (reg == nullptr) {
  24. return;
  25. }
  26. for (TProbe** i = reg; *i != nullptr; i++) {
  27. AddProbeNoLock(new TStaticBox(*i));
  28. }
  29. }
  30. void TProbeRegistry::AddProbe(const TBoxPtr& box) {
  31. TGuard<TMutex> g(Mutex);
  32. AddProbeNoLock(box);
  33. }
  34. void TProbeRegistry::RemoveProbe(TProbe* probe) {
  35. TGuard<TMutex> g(Mutex);
  36. RemoveProbeNoLock(probe);
  37. }
  38. void TProbeRegistry::AddProbeNoLock(const TBoxPtr& box) {
  39. TProbe* probe = box->GetProbe();
  40. if (Probes.contains(probe)) {
  41. return; // silently skip probe double registration
  42. }
  43. TIds::key_type key(probe->Event.GetProvider(), probe->Event.Name);
  44. Y_ABORT_UNLESS(Ids.count(key) == 0, "duplicate provider:probe pair %s:%s", key.first.data(), key.second.data());
  45. Probes.emplace(probe, box);
  46. Ids.insert(key);
  47. }
  48. void TProbeRegistry::RemoveProbeNoLock(TProbe* probe) {
  49. auto iter = Probes.find(probe);
  50. if (iter != Probes.end()) {
  51. TIds::key_type key(probe->Event.GetProvider(), probe->Event.Name);
  52. Ids.erase(key);
  53. Probes.erase(iter);
  54. } else {
  55. // silently skip probe double unregistration
  56. }
  57. }
  58. TAtomic* GetVariablePtr(TSession::TTraceVariables& traceVariables, const TString& name) {
  59. TSession::TTraceVariables::iterator it = traceVariables.find(name);
  60. if (it == traceVariables.end()) {
  61. TAtomicBase zero = 0;
  62. traceVariables[name] = zero;
  63. return &traceVariables[name];
  64. }
  65. return &((*it).second);
  66. }
  67. typedef enum {
  68. OT_LITERAL = 0,
  69. OT_PARAMETER = 1,
  70. OT_VARIABLE = 2
  71. } EOperandType;
  72. template <class T, EOperandType>
  73. class TOperand;
  74. template <class T>
  75. class TOperand<T, OT_LITERAL> {
  76. private:
  77. T ImmediateValue;
  78. public:
  79. TOperand(TSession::TTraceVariables&, const TString&, const TString& value, size_t) {
  80. ImmediateValue = TParamConv<T>::FromString(value);
  81. }
  82. const T& Get(const TParams&) {
  83. return ImmediateValue;
  84. }
  85. };
  86. template <class T>
  87. class TOperand<T, OT_PARAMETER> {
  88. private:
  89. size_t Idx;
  90. public:
  91. TOperand(TSession::TTraceVariables&, const TString&, const TString&, size_t idx) {
  92. Idx = idx;
  93. }
  94. const T& Get(const TParams& params) {
  95. return params.Param[Idx].template Get<T>();
  96. }
  97. };
  98. template <class T>
  99. class TOperand<T, OT_VARIABLE> {
  100. private:
  101. TAtomic* Variable;
  102. public:
  103. TOperand(TSession::TTraceVariables& traceVariables, const TString& name, const TString&, size_t) {
  104. Variable = GetVariablePtr(traceVariables, name);
  105. }
  106. const T Get(const TParams&) {
  107. return (T)AtomicGet(*Variable);
  108. }
  109. void Set(const T& value) {
  110. AtomicSet(*Variable, value);
  111. }
  112. void Inc() {
  113. AtomicIncrement(*Variable);
  114. }
  115. void Dec() {
  116. AtomicDecrement(*Variable);
  117. }
  118. void Add(const TAtomicBase value) {
  119. AtomicAdd(*Variable, value);
  120. }
  121. void Sub(const TAtomicBase value) {
  122. AtomicSub(*Variable, value);
  123. }
  124. };
  125. template <>
  126. class TOperand<TCheck, OT_VARIABLE> {
  127. private:
  128. TAtomic* Variable;
  129. public:
  130. TOperand(TSession::TTraceVariables& traceVariables, const TString& name, const TString&, size_t) {
  131. Variable = GetVariablePtr(traceVariables, name);
  132. }
  133. const TCheck Get(const TParams&) {
  134. return TCheck(AtomicGet(*Variable));
  135. }
  136. void Set(const TCheck& value) {
  137. AtomicSet(*Variable, value.Value);
  138. }
  139. void Add(const TCheck& value) {
  140. AtomicAdd(*Variable, value.Value);
  141. }
  142. void Sub(const TCheck value) {
  143. AtomicSub(*Variable, value.Value);
  144. }
  145. void Inc() {
  146. AtomicIncrement(*Variable);
  147. }
  148. void Dec() {
  149. AtomicDecrement(*Variable);
  150. }
  151. };
  152. template <>
  153. class TOperand<TString, OT_VARIABLE> {
  154. private:
  155. TString Dummy;
  156. public:
  157. TOperand(TSession::TTraceVariables&, const TString&, const TString&, size_t) {
  158. }
  159. const TString Get(const TParams&) {
  160. return Dummy;
  161. }
  162. void Set(const TString&) {
  163. }
  164. };
  165. template <>
  166. class TOperand<TSymbol, OT_VARIABLE> {
  167. private:
  168. TSymbol Dummy;
  169. public:
  170. TOperand(TSession::TTraceVariables&, const TString&, const TString&, size_t) {
  171. }
  172. const TSymbol Get(const TParams&) {
  173. return Dummy;
  174. }
  175. void Set(const TSymbol&) {
  176. }
  177. };
  178. // IOperandGetter: hide concrete EOperandType, to save compilation time
  179. template <class T>
  180. struct IOperandGetter {
  181. virtual const T Get(const TParams& params) = 0;
  182. virtual ~IOperandGetter() {
  183. }
  184. };
  185. template <class T, EOperandType TParam>
  186. class TOperandGetter: public IOperandGetter<T> {
  187. private:
  188. TOperand<T, TParam> Op;
  189. public:
  190. TOperandGetter(const TOperand<T, TParam>& op)
  191. : Op(op)
  192. {
  193. }
  194. const T Get(const TParams& params) override {
  195. return Op.Get(params);
  196. }
  197. };
  198. template <class T>
  199. class TReceiver: public TOperand<T, OT_VARIABLE> {
  200. public:
  201. TReceiver(TSession::TTraceVariables& traceVariables, const TString& name)
  202. : TOperand<T, OT_VARIABLE>(traceVariables, name, nullptr, 0)
  203. {
  204. }
  205. };
  206. template <class TP, class TPredicate>
  207. static bool CmpFunc(TP a, TP b) {
  208. return TPredicate()(a, b);
  209. }
  210. template <class TP, class TFunc, EOperandType TLhs, EOperandType TRhs>
  211. class TOperatorExecutor: public IExecutor {
  212. private:
  213. bool InvertCompare;
  214. TOperand<TP, TLhs> Lhs;
  215. TOperand<TP, TRhs> Rhs;
  216. bool DoExecute(TOrbit&, const TParams& params) override {
  217. return TFunc()(Lhs.Get(params), Rhs.Get(params)) != InvertCompare;
  218. }
  219. public:
  220. TOperatorExecutor(const TOperand<TP, TLhs>& lhs, const TOperand<TP, TRhs>& rhs, bool invertCompare)
  221. : InvertCompare(invertCompare)
  222. , Lhs(lhs)
  223. , Rhs(rhs)
  224. {
  225. }
  226. };
  227. template <class TR, class TP>
  228. struct TAddEq {
  229. void operator()(TR& x, TP y) const {
  230. x.Add(y);
  231. }
  232. };
  233. template <class TR, class TP>
  234. struct TSubEq {
  235. void operator()(TR& x, TP y) const {
  236. x.Sub(y);
  237. }
  238. };
  239. template <class TR>
  240. struct TInc {
  241. void operator()(TR& x) const {
  242. x.Inc();
  243. }
  244. };
  245. template <class TR>
  246. struct TDec {
  247. void operator()(TR& x) const {
  248. x.Dec();
  249. }
  250. };
  251. template <class TP, class TFunc>
  252. class TUnaryInplaceStatementExecutor: public IExecutor {
  253. private:
  254. TFunc Func;
  255. TReceiver<TP> Receiver;
  256. bool DoExecute(TOrbit&, const TParams&) override {
  257. Func(Receiver);
  258. return true;
  259. }
  260. public:
  261. TUnaryInplaceStatementExecutor(TReceiver<TP>& receiver)
  262. : Receiver(receiver)
  263. {
  264. }
  265. };
  266. template <class TP, class TFunc, EOperandType TParam>
  267. class TBinaryInplaceStatementExecutor: public IExecutor {
  268. private:
  269. TFunc Func;
  270. TReceiver<TP> Receiver;
  271. TOperand<TP, TParam> Param;
  272. bool DoExecute(TOrbit&, const TParams& params) override {
  273. Func(Receiver, Param.Get(params));
  274. return true;
  275. }
  276. public:
  277. TBinaryInplaceStatementExecutor(TReceiver<TP>& receiver, const TOperand<TP, TParam>& param)
  278. : Receiver(receiver)
  279. , Param(param)
  280. {
  281. }
  282. };
  283. template <class TP, class TFunc, EOperandType TFirstParam>
  284. class TBinaryStatementExecutor: public IExecutor {
  285. private:
  286. TFunc Func;
  287. TReceiver<TP> Receiver;
  288. TOperand<TP, TFirstParam> FirstParam;
  289. bool DoExecute(TOrbit&, const TParams& params) override {
  290. Receiver.Set(Func(Receiver.Get(params), FirstParam.Get(params)));
  291. return true;
  292. }
  293. public:
  294. TBinaryStatementExecutor(TReceiver<TP>& receiver, const TOperand<TP, TFirstParam>& firstParam)
  295. : Receiver(receiver)
  296. , FirstParam(firstParam)
  297. {
  298. }
  299. };
  300. template <class TP, class TFunc>
  301. class TTernaryStatementExecutor: public IExecutor {
  302. private:
  303. TFunc Func;
  304. TReceiver<TP> Receiver;
  305. TAutoPtr<IOperandGetter<TP>> FirstParam;
  306. TAutoPtr<IOperandGetter<TP>> SecondParam;
  307. bool DoExecute(TOrbit&, const TParams& params) override {
  308. Receiver.Set(Func(FirstParam->Get(params), SecondParam->Get(params)));
  309. return true;
  310. }
  311. public:
  312. TTernaryStatementExecutor(const TReceiver<TP>& receiver,
  313. TAutoPtr<IOperandGetter<TP>> firstParam,
  314. TAutoPtr<IOperandGetter<TP>> secondParam)
  315. : Receiver(receiver)
  316. , FirstParam(firstParam)
  317. , SecondParam(secondParam)
  318. {
  319. }
  320. };
  321. template <class TLog>
  322. class TLogActionExecutor: public IExecutor {
  323. private:
  324. bool LogParams;
  325. bool LogTimestamp;
  326. intptr_t* MaxRecords;
  327. TAtomic Records;
  328. TProbe* Probe;
  329. TLog* Log;
  330. bool DoExecute(TOrbit&, const TParams& params) override {
  331. if (MaxRecords != nullptr) {
  332. while (true) {
  333. intptr_t a = AtomicGet(Records);
  334. if (a >= *MaxRecords) {
  335. return true;
  336. }
  337. if (AtomicCas(&Records, a + 1, a)) {
  338. Write(params);
  339. return true;
  340. }
  341. }
  342. } else {
  343. Write(params);
  344. return true;
  345. }
  346. }
  347. void Write(const TParams& params) {
  348. typename TLog::TAccessor la(*Log);
  349. if (typename TLog::TItem* item = la.Add()) {
  350. item->Probe = Probe;
  351. if (LogParams) {
  352. if ((item->SavedParamsCount = Probe->Event.Signature.ParamCount) > 0) {
  353. Probe->Event.Signature.CloneParams(item->Params, params);
  354. }
  355. } else {
  356. item->SavedParamsCount = 0;
  357. }
  358. if (LogTimestamp) {
  359. item->Timestamp = TInstant::Now();
  360. }
  361. item->TimestampCycles = GetCycleCount();
  362. }
  363. }
  364. public:
  365. TLogActionExecutor(TProbe* probe, const TLogAction& action, TLog* log)
  366. : LogParams(!action.GetDoNotLogParams())
  367. , LogTimestamp(action.GetLogTimestamp())
  368. , MaxRecords(action.GetMaxRecords() ? new intptr_t(action.GetMaxRecords()) : nullptr)
  369. , Records(0)
  370. , Probe(probe)
  371. , Log(log)
  372. {
  373. }
  374. ~TLogActionExecutor() override {
  375. delete MaxRecords;
  376. }
  377. };
  378. class TSamplingExecutor: public IExecutor {
  379. private:
  380. double SampleRate;
  381. public:
  382. explicit TSamplingExecutor(double sampleRate)
  383. : SampleRate(sampleRate)
  384. {}
  385. bool DoExecute(TOrbit&, const TParams&) override {
  386. return RandomNumber<double>() < SampleRate;
  387. }
  388. };
  389. typedef struct {
  390. EOperandType Type;
  391. size_t ParamIdx;
  392. } TArgumentDescription;
  393. using TArgumentList = TVector<TArgumentDescription>;
  394. template <class T>
  395. void ParseArguments(const T& op, const TSignature& signature, const TString& exceptionPrefix, size_t expectedArgumentCount, TArgumentList& arguments) {
  396. arguments.clear();
  397. size_t firstParamIdx = size_t(-1);
  398. for (size_t argumentIdx = 0; argumentIdx < op.ArgumentSize(); ++argumentIdx) {
  399. const TArgument& arg = op.GetArgument(argumentIdx);
  400. TArgumentDescription operand;
  401. operand.ParamIdx = size_t(-1);
  402. if (arg.GetVariable()) {
  403. operand.Type = OT_VARIABLE;
  404. } else if (arg.GetValue()) {
  405. operand.Type = OT_LITERAL;
  406. } else if (arg.GetParam()) {
  407. operand.Type = OT_PARAMETER;
  408. operand.ParamIdx = signature.FindParamIndex(arg.GetParam());
  409. if (operand.ParamIdx == size_t(-1)) {
  410. ythrow yexception() << exceptionPrefix
  411. << " argument #" << argumentIdx << " param '" << arg.GetParam()
  412. << "' doesn't exist";
  413. }
  414. if (firstParamIdx == size_t(-1)) {
  415. firstParamIdx = operand.ParamIdx;
  416. } else {
  417. if (strcmp(signature.ParamTypes[firstParamIdx], signature.ParamTypes[operand.ParamIdx]) != 0) {
  418. ythrow yexception() << exceptionPrefix
  419. << " param types do not match";
  420. }
  421. }
  422. } else {
  423. ythrow yexception() << exceptionPrefix
  424. << " argument #" << argumentIdx
  425. << " is empty";
  426. }
  427. arguments.push_back(operand);
  428. }
  429. if (arguments.size() != expectedArgumentCount) {
  430. ythrow yexception() << exceptionPrefix
  431. << " incorrect number of arguments (" << arguments.size()
  432. << " present, " << expectedArgumentCount << " expected)";
  433. }
  434. }
  435. template <class TArg1, class TArg2>
  436. struct TTraceSecondArg {
  437. // implementation of deprecated std::project2nd
  438. TArg1 operator()(const TArg1&, const TArg2& y) const {
  439. return y;
  440. }
  441. };
  442. void TSession::InsertExecutor(
  443. TTraceVariables& traceVariables, size_t bi, const TPredicate* pred,
  444. const NProtoBuf::RepeatedPtrField<TAction>& actions, TProbe* probe,
  445. const bool destructiveActionsAllowed,
  446. const TCustomActionFactory& customActionFactory) {
  447. #ifndef LWTRACE_DISABLE
  448. THolder<IExecutor> exec;
  449. IExecutor* last = nullptr;
  450. TArgumentList arguments;
  451. if (pred) {
  452. double sampleRate = pred->GetSampleRate();
  453. if (sampleRate != 0.0) {
  454. if (!(0.0 < sampleRate && sampleRate <= 1.0)) {
  455. ythrow yexception() << "probe '" << probe->Event.Name << "' block #" << bi + 1 << " sampling operator"
  456. << " invalid sample rate " << sampleRate << ", expected [0;1]";
  457. }
  458. exec.Reset(new TSamplingExecutor(sampleRate));
  459. last = exec.Get();
  460. }
  461. for (size_t i = 0; i < pred->OperatorsSize(); i++) {
  462. const TOperator& op = pred->GetOperators(i);
  463. TString exceptionPrefix;
  464. TStringOutput exceptionPrefixOutput(exceptionPrefix);
  465. exceptionPrefixOutput << "probe '" << probe->Event.Name << "' block #" << bi + 1 << " operator #" << i + 1;
  466. ParseArguments<TOperator>(op, probe->Event.Signature, exceptionPrefix, 2, arguments);
  467. THolder<IExecutor> opExec;
  468. TArgumentDescription arg0 = arguments.at(0);
  469. TArgumentDescription arg1 = arguments.at(1);
  470. const char* tName0 = arg0.ParamIdx == size_t(-1) ? nullptr : probe->Event.Signature.ParamTypes[arg0.ParamIdx];
  471. const char* tName1 = arg1.ParamIdx == size_t(-1) ? nullptr : probe->Event.Signature.ParamTypes[arg1.ParamIdx];
  472. TString var0 = op.GetArgument(0).GetVariable();
  473. TString var1 = op.GetArgument(1).GetVariable();
  474. TString val0 = op.GetArgument(0).GetValue();
  475. TString val1 = op.GetArgument(1).GetValue();
  476. #define FOREACH_OPERAND_TYPE_RT(n, t, v, fn, lt, rt) \
  477. if (rt == arg1.Type) { \
  478. TOperand<t, rt> rhs(traceVariables, var1, val1, arg1.ParamIdx); \
  479. opExec.Reset(new TOperatorExecutor<t, fn<t>, lt, rt>(lhs, rhs, invertCompare)); \
  480. break; \
  481. }
  482. #define FOREACH_OPERAND_TYPE_LT(n, t, v, fn, lt) \
  483. if (lt == arg0.Type) { \
  484. TOperand<t, lt> lhs(traceVariables, var0, val0, arg0.ParamIdx); \
  485. FOREACH_RIGHT_TYPE(FOREACH_OPERAND_TYPE_RT, n, t, v, fn, lt) \
  486. }
  487. #define FOREACH_PARAMTYPE_MACRO(n, t, v, fn) \
  488. if ((arg0.ParamIdx == size_t(-1) || strcmp(tName0, n) == 0) && (arg1.ParamIdx == size_t(-1) || strcmp(tName1, n) == 0)) { \
  489. FOREACH_LEFT_TYPE(FOREACH_OPERAND_TYPE_LT, n, t, v, fn); \
  490. }
  491. bool invertCompare = EqualToOneOf(op.GetType(), OT_NE, OT_GE, OT_LE);
  492. switch (op.GetType()) {
  493. case OT_EQ:
  494. case OT_NE:
  495. FOREACH_PARAMTYPE(FOREACH_PARAMTYPE_MACRO, std::equal_to);
  496. break;
  497. case OT_LT:
  498. case OT_GE:
  499. FOREACH_PARAMTYPE(FOREACH_PARAMTYPE_MACRO, std::less);
  500. break;
  501. case OT_GT:
  502. case OT_LE:
  503. FOREACH_PARAMTYPE(FOREACH_PARAMTYPE_MACRO, std::greater);
  504. break;
  505. default:
  506. ythrow yexception() << exceptionPrefix
  507. << " has not supported operator type #" << int(op.GetType());
  508. }
  509. #undef FOREACH_OPERAND_TYPE_RT
  510. #undef FOREACH_OPERAND_TYPE_LT
  511. #undef FOREACH_PARAMTYPE_MACRO
  512. if (!opExec) {
  513. ythrow yexception() << exceptionPrefix
  514. << " has not supported left param #" << arg0.ParamIdx + 1 << " type '"
  515. << (arg0.ParamIdx != size_t(-1) ? probe->Event.Signature.ParamTypes[arg0.ParamIdx] : "?")
  516. << "', or right param #" << arg0.ParamIdx + 1 << " type '"
  517. << (arg1.ParamIdx != size_t(-1) ? probe->Event.Signature.ParamTypes[arg1.ParamIdx] : "?")
  518. << "'";
  519. }
  520. if (!exec) {
  521. exec.Reset(opExec.Release());
  522. last = exec.Get();
  523. } else {
  524. last->SetNext(opExec.Release());
  525. last = last->GetNext();
  526. }
  527. }
  528. }
  529. for (int i = 0; i < actions.size(); ++i) {
  530. const TAction& action = actions.Get(i);
  531. THolder<IExecutor> actExec;
  532. if (action.HasPrintToStderrAction()) {
  533. actExec.Reset(new TStderrActionExecutor(probe));
  534. } else if (action.HasLogAction()) {
  535. if (Query.GetLogDurationUs()) {
  536. actExec.Reset(new TLogActionExecutor<TDurationLog>(probe, action.GetLogAction(), &DurationLog));
  537. } else {
  538. actExec.Reset(new TLogActionExecutor<TCyclicLog>(probe, action.GetLogAction(), &CyclicLog));
  539. }
  540. } else if (action.HasRunLogShuttleAction()) {
  541. if (Query.GetLogDurationUs()) {
  542. actExec.Reset(new TRunLogShuttleActionExecutor<TDurationDepot>(TraceIdx, action.GetRunLogShuttleAction(), &DurationDepot, &LastTrackId, &LastSpanId));
  543. } else {
  544. actExec.Reset(new TRunLogShuttleActionExecutor<TCyclicDepot>(TraceIdx, action.GetRunLogShuttleAction(), &CyclicDepot, &LastTrackId, &LastSpanId));
  545. }
  546. } else if (action.HasEditLogShuttleAction()) {
  547. if (Query.GetLogDurationUs()) {
  548. actExec.Reset(new TEditLogShuttleActionExecutor<TDurationDepot>(TraceIdx, action.GetEditLogShuttleAction()));
  549. } else {
  550. actExec.Reset(new TEditLogShuttleActionExecutor<TCyclicDepot>(TraceIdx, action.GetEditLogShuttleAction()));
  551. }
  552. } else if (action.HasDropLogShuttleAction()) {
  553. if (Query.GetLogDurationUs()) {
  554. actExec.Reset(new TDropLogShuttleActionExecutor<TDurationDepot>(TraceIdx, action.GetDropLogShuttleAction()));
  555. } else {
  556. actExec.Reset(new TDropLogShuttleActionExecutor<TCyclicDepot>(TraceIdx, action.GetDropLogShuttleAction()));
  557. }
  558. } else if (action.HasCustomAction()) {
  559. THolder<TCustomActionExecutor> customExec(customActionFactory.Create(probe, action.GetCustomAction(), this));
  560. if (customExec) {
  561. if (!customExec->IsDestructive() || destructiveActionsAllowed) {
  562. actExec.Reset(customExec.Release());
  563. } else {
  564. ythrow yexception() << "probe '" << probe->Event.Name << "block #" << bi + 1 << " action #" << i + 1
  565. << " contains destructive CustomAction, but destructive actions are disabled."
  566. << " Please, consider using --unsafe-lwtrace command line parameter.";
  567. }
  568. } else {
  569. ythrow yexception() << "probe '" << probe->Event.Name << "block #" << bi + 1 << " action #" << i + 1
  570. << " contains unregistered CustomAction '" << action.GetCustomAction().GetName() << "'";
  571. }
  572. } else if (action.HasKillAction()) {
  573. if (destructiveActionsAllowed) {
  574. actExec.Reset(new NPrivate::TKillActionExecutor(probe));
  575. } else {
  576. ythrow yexception() << "probe '" << probe->Event.Name << "block #" << bi + 1 << " action #" << i + 1
  577. << " contains destructive KillAction, but destructive actions are disabled."
  578. << " Please, consider using --unsafe-lwtrace command line parameter.";
  579. }
  580. } else if (action.HasSleepAction()) {
  581. if (destructiveActionsAllowed) {
  582. const TSleepAction& sleepAction = action.GetSleepAction();
  583. if (sleepAction.GetNanoSeconds()) {
  584. ui64 nanoSeconds = sleepAction.GetNanoSeconds();
  585. actExec.Reset(new NPrivate::TSleepActionExecutor(probe, nanoSeconds));
  586. } else {
  587. ythrow yexception() << "probe '" << probe->Event.Name << "block #" << bi + 1 << " action #" << i + 1
  588. << " SleepAction missing parameter 'NanoSeconds'";
  589. }
  590. } else {
  591. ythrow yexception() << "probe '" << probe->Event.Name << "block #" << bi + 1 << " action #" << i + 1
  592. << " contains destructive SleepAction, but destructive actions are disabled."
  593. << " Please, consider using --unsafe-lwtrace command line parameter.";
  594. }
  595. } else if (action.HasStatementAction()) {
  596. const TStatementAction& statement = action.GetStatementAction();
  597. TString exceptionPrefix;
  598. TStringOutput exceptionPrefixOutput(exceptionPrefix);
  599. exceptionPrefixOutput << "probe '" << probe->Event.Name << "' block #" << bi + 1 << " action #" << i + 1;
  600. size_t expectedArgumentCount = 3;
  601. if (statement.GetType() == ST_MOV || statement.GetType() == ST_ADD_EQ || statement.GetType() == ST_SUB_EQ) {
  602. expectedArgumentCount = 2;
  603. } else if (statement.GetType() == ST_INC || statement.GetType() == ST_DEC) {
  604. expectedArgumentCount = 1;
  605. }
  606. ParseArguments<TStatementAction>(statement, probe->Event.Signature, exceptionPrefix, expectedArgumentCount, arguments);
  607. TArgumentDescription arg0 = (expectedArgumentCount <= 0) ? TArgumentDescription() : arguments.at(0);
  608. TArgumentDescription arg1 = (expectedArgumentCount <= 1) ? TArgumentDescription() : arguments.at(1);
  609. TArgumentDescription arg2 = (expectedArgumentCount <= 2) ? TArgumentDescription() : arguments.at(2);
  610. TString var0 = (expectedArgumentCount <= 0) ? "" : statement.GetArgument(0).GetVariable();
  611. TString var1 = (expectedArgumentCount <= 1) ? "" : statement.GetArgument(1).GetVariable();
  612. TString var2 = (expectedArgumentCount <= 2) ? "" : statement.GetArgument(2).GetVariable();
  613. TString val0 = (expectedArgumentCount <= 0) ? "" : statement.GetArgument(0).GetValue();
  614. TString val1 = (expectedArgumentCount <= 1) ? "" : statement.GetArgument(1).GetValue();
  615. TString val2 = (expectedArgumentCount <= 2) ? "" : statement.GetArgument(2).GetValue();
  616. const char* tName1 = (expectedArgumentCount <= 1 || arg1.ParamIdx == size_t(-1))
  617. ? nullptr : probe->Event.Signature.ParamTypes[arg1.ParamIdx];
  618. const char* tName2 = (expectedArgumentCount <= 2 || arg2.ParamIdx == size_t(-1))
  619. ? nullptr : probe->Event.Signature.ParamTypes[arg2.ParamIdx];
  620. if (arg0.Type == OT_VARIABLE) {
  621. switch (statement.GetType()) {
  622. #define PARSE_UNARY_INPLACE_STATEMENT_MACRO(n, t, v, fn) \
  623. { \
  624. typedef TUnaryInplaceStatementExecutor<t, fn<TReceiver<t>>> TExec; \
  625. TReceiver<t> receiver(traceVariables, var0); \
  626. actExec.Reset(new TExec(receiver)); \
  627. break; \
  628. }
  629. #define PARSE_BINARY_INPLACE_STATEMENT_MACRO2(n, t, v, fn, ft) \
  630. if (arg1.Type == ft) { \
  631. typedef TBinaryInplaceStatementExecutor<t, fn<TReceiver<t>, t>, ft> TExec; \
  632. TOperand<t, ft> firstParam(traceVariables, var1, val1, arg1.ParamIdx); \
  633. actExec.Reset(new TExec(receiver, firstParam)); \
  634. break; \
  635. }
  636. #define PARSE_BINARY_INPLACE_STATEMENT_MACRO(n, t, v, fn) \
  637. if (arg1.ParamIdx == size_t(-1) || strcmp(tName1, n) == 0) { \
  638. TReceiver<t> receiver(traceVariables, var0); \
  639. FOREACH_RIGHT_TYPE(PARSE_BINARY_INPLACE_STATEMENT_MACRO2, n, t, v, fn); \
  640. }
  641. #define PARSE_BINARY_STATEMENT_MACRO2(n, t, v, fn, ft) \
  642. if (arg1.Type == ft) { \
  643. typedef TBinaryStatementExecutor<t, fn<t, t>, ft> TExec; \
  644. TOperand<t, ft> firstParam(traceVariables, var1, val1, arg1.ParamIdx); \
  645. actExec.Reset(new TExec(receiver, firstParam)); \
  646. break; \
  647. }
  648. #define PARSE_BINARY_STATEMENT_MACRO(n, t, v, fn) \
  649. if (arg1.ParamIdx == size_t(-1) || strcmp(tName1, n) == 0) { \
  650. TReceiver<t> receiver(traceVariables, var0); \
  651. FOREACH_RIGHT_TYPE(PARSE_BINARY_STATEMENT_MACRO2, n, t, v, fn); \
  652. }
  653. #define CREATE_OPERAND_GETTER_N(N, type, arg_type) \
  654. if (arg##N.Type == arg_type) { \
  655. operand##N.Reset(new TOperandGetter<type, arg_type>(TOperand<type, arg_type>(traceVariables, var##N, val##N, arg##N.ParamIdx))); \
  656. }
  657. #define TERNARY_ON_TYPE(n, t, v, fn) \
  658. if ((arg1.ParamIdx == size_t(-1) || strcmp(tName1, n) == 0) && (arg2.ParamIdx == size_t(-1) || strcmp(tName2, n) == 0)) { \
  659. TAutoPtr<IOperandGetter<t>> operand1, operand2; \
  660. FOREACH_LEFT_TYPE(CREATE_OPERAND_GETTER_N, 1, t); \
  661. FOREACH_RIGHT_TYPE(CREATE_OPERAND_GETTER_N, 2, t); \
  662. if (operand1 && operand2) { \
  663. actExec.Reset(new TTernaryStatementExecutor<t, fn<t>>( \
  664. TReceiver<t>(traceVariables, var0), \
  665. operand1, \
  666. operand2)); \
  667. } \
  668. break; \
  669. }
  670. #define IMPLEMENT_TERNARY_STATEMENT(fn) FOR_MATH_PARAMTYPE(TERNARY_ON_TYPE, fn)
  671. case ST_INC:
  672. FOR_MATH_PARAMTYPE(PARSE_UNARY_INPLACE_STATEMENT_MACRO, TInc);
  673. break;
  674. case ST_DEC:
  675. FOR_MATH_PARAMTYPE(PARSE_UNARY_INPLACE_STATEMENT_MACRO, TDec);
  676. break;
  677. case ST_MOV:
  678. FOR_MATH_PARAMTYPE(PARSE_BINARY_STATEMENT_MACRO, TTraceSecondArg);
  679. break;
  680. case ST_ADD_EQ:
  681. FOR_MATH_PARAMTYPE(PARSE_BINARY_INPLACE_STATEMENT_MACRO, TAddEq);
  682. break;
  683. case ST_SUB_EQ:
  684. FOR_MATH_PARAMTYPE(PARSE_BINARY_INPLACE_STATEMENT_MACRO, TSubEq);
  685. break;
  686. case ST_ADD:
  687. IMPLEMENT_TERNARY_STATEMENT(std::plus)
  688. break;
  689. case ST_SUB:
  690. IMPLEMENT_TERNARY_STATEMENT(std::minus)
  691. break;
  692. case ST_MUL:
  693. IMPLEMENT_TERNARY_STATEMENT(std::multiplies)
  694. break;
  695. case ST_DIV:
  696. IMPLEMENT_TERNARY_STATEMENT(std::divides)
  697. break;
  698. case ST_MOD:
  699. IMPLEMENT_TERNARY_STATEMENT(std::modulus)
  700. break;
  701. default:
  702. ythrow yexception() << "block #" << bi + 1 << " action #" << i + 1
  703. << " has not supported statement type #" << int(statement.GetType());
  704. }
  705. }
  706. if (!actExec) {
  707. ythrow yexception() << "block #" << bi + 1 << " action #" << i + 1
  708. << " can't create action";
  709. }
  710. #undef CREATE_OPERAND_GETTER_N
  711. #undef TERNARY_ON_TYPE
  712. #undef IMPLEMENT_TERNARY_STATEMENT
  713. #undef PARSE_TERNARY_STATEMENT_MACRO
  714. #undef PARSE_BINARY_STATEMENT_MACRO
  715. #undef PARSE_BINARY_INPLACE_STATEMENT_MACRO
  716. #undef PARSE_UNARY_INPLACE_STATEMENT_MACRO
  717. } else {
  718. ythrow yexception() << "block #" << bi + 1 << " action #" << i + 1
  719. << " has not supported action '" << action.ShortDebugString() << "'";
  720. }
  721. if (!exec) {
  722. exec.Reset(actExec.Release());
  723. last = exec.Get();
  724. } else {
  725. last->SetNext(actExec.Release());
  726. last = last->GetNext();
  727. }
  728. }
  729. if (!probe->Attach(exec.Get())) {
  730. ythrow yexception() << "block #" << bi + 1
  731. << " cannot be attached to probe '" << probe->Event.Name << "': no free slots";
  732. }
  733. Probes.push_back(std::make_pair(probe, exec.Release()));
  734. #else
  735. Y_UNUSED(bi);
  736. Y_UNUSED(pred);
  737. Y_UNUSED(actions);
  738. Y_UNUSED(probe);
  739. Y_UNUSED(destructiveActionsAllowed);
  740. Y_UNUSED(traceVariables);
  741. Y_UNUSED(customActionFactory);
  742. #endif
  743. }
  744. TSession::TSession(ui64 traceIdx,
  745. TProbeRegistry& registry,
  746. const TQuery& query,
  747. const bool destructiveActionsAllowed,
  748. const TCustomActionFactory& customActionFactory)
  749. : StartTime(TInstant::Now())
  750. , TraceIdx(traceIdx)
  751. , Registry(registry)
  752. , StoreDuration(TDuration::MicroSeconds(query.GetLogDurationUs() * 11 / 10)) // +10% to try avoid truncation while reading multiple threads/traces
  753. , ReadDuration(TDuration::MicroSeconds(query.GetLogDurationUs()))
  754. , CyclicLog(query.GetPerThreadLogSize() ? query.GetPerThreadLogSize() : 1000)
  755. , DurationLog(StoreDuration)
  756. , CyclicDepot(query.GetPerThreadLogSize() ? query.GetPerThreadLogSize() : 1000)
  757. , DurationDepot(StoreDuration)
  758. , LastTrackId(0)
  759. , LastSpanId(0)
  760. , Attached(true)
  761. , Query(query)
  762. {
  763. try {
  764. for (size_t bi = 0; bi < query.BlocksSize(); bi++) {
  765. const TBlock& block = query.GetBlocks(bi);
  766. if (!block.HasProbeDesc()) {
  767. ythrow yexception() << "block #" << bi + 1 << " has no probe description";
  768. }
  769. const TProbeDesc& pdesc = block.GetProbeDesc();
  770. const TPredicate* pred = block.HasPredicate() ? &block.GetPredicate() : nullptr;
  771. if (block.ActionSize() < 1) {
  772. ythrow yexception() << "block #" << bi + 1 << " has no action";
  773. }
  774. const NProtoBuf::RepeatedPtrField<TAction>& actions = block.action();
  775. if (pdesc.GetName() && pdesc.GetProvider()) {
  776. TProbeRegistry::TProbesAccessor probes(Registry);
  777. bool found = false;
  778. for (auto& kv : probes) {
  779. TProbe* probe = kv.first;
  780. if (probe->Event.Name == pdesc.GetName() && probe->Event.GetProvider() == pdesc.GetProvider()) {
  781. InsertExecutor(TraceVariables, bi, pred, actions, probe, destructiveActionsAllowed, customActionFactory);
  782. found = true;
  783. break;
  784. }
  785. }
  786. if (!found) {
  787. ythrow yexception() << "block #" << bi + 1 << " has no matching probe with name '"
  788. << pdesc.GetName() << "' provider '" << pdesc.GetProvider() << "'";
  789. }
  790. } else if (pdesc.GetGroup()) {
  791. bool found = false;
  792. TProbeRegistry::TProbesAccessor probes(Registry);
  793. for (auto& kv : probes) {
  794. TProbe* probe = kv.first;
  795. for (const char* const* gi = probe->Event.Groups; *gi != nullptr; gi++) {
  796. if (*gi == pdesc.GetGroup()) {
  797. InsertExecutor(TraceVariables, bi, pred, actions, probe, destructiveActionsAllowed, customActionFactory);
  798. found = true;
  799. break;
  800. }
  801. }
  802. }
  803. if (!found) {
  804. ythrow yexception() << "block #" << bi + 1
  805. << " has no matching probes for group '" << pdesc.GetGroup() << "'";
  806. }
  807. } else {
  808. ythrow yexception() << "block #" << bi + 1 << " has bad probe description: name '" << pdesc.GetName()
  809. << "' provider '" << pdesc.GetProvider()
  810. << "' group '" << pdesc.GetGroup() << "'";
  811. }
  812. }
  813. } catch (...) {
  814. Destroy();
  815. throw;
  816. }
  817. }
  818. void TSession::Destroy() {
  819. Detach();
  820. for (auto& probe : Probes) {
  821. delete probe.second;
  822. }
  823. }
  824. TSession::~TSession() {
  825. Destroy();
  826. }
  827. void TSession::Detach() {
  828. if (Attached) {
  829. for (auto& p : Probes) {
  830. TProbe* probe = p.first;
  831. IExecutor* exec = p.second;
  832. probe->Detach(exec);
  833. }
  834. Attached = false;
  835. }
  836. }
  837. size_t TSession::GetEventsCount() const {
  838. return CyclicLog.GetEventsCount() + DurationLog.GetEventsCount() + CyclicDepot.GetEventsCount() + DurationDepot.GetEventsCount();
  839. }
  840. size_t TSession::GetThreadsCount() const {
  841. return CyclicLog.GetThreadsCount() + DurationLog.GetThreadsCount() + CyclicDepot.GetThreadsCount() + DurationDepot.GetThreadsCount();
  842. }
  843. class TReadToProtobuf {
  844. private:
  845. TMap<TThread::TId, TVector<TLogItem>> Items;
  846. public:
  847. void ToProtobuf(TLogPb& pb) const {
  848. TSet<TProbe*> probes;
  849. ui64 eventsCount = 0;
  850. for (auto kv : Items) {
  851. TThreadLogPb* tpb = pb.AddThreadLogs();
  852. tpb->SetThreadId(kv.first);
  853. for (TLogItem& item : kv.second) {
  854. item.ToProtobuf(*tpb->AddLogItems());
  855. probes.insert(item.Probe);
  856. eventsCount++;
  857. }
  858. }
  859. pb.SetEventsCount(eventsCount);
  860. for (TProbe* probe : probes) {
  861. probe->Event.ToProtobuf(*pb.AddEvents());
  862. }
  863. }
  864. void Push(TThread::TId tid, const TLogItem& item) {
  865. // Avoid any expansive operations in Push(), because it executes under lock and blocks program being traced
  866. Items[tid].push_back(item);
  867. }
  868. };
  869. void TSession::ToProtobuf(TLogPb& pb) const {
  870. TReadToProtobuf reader;
  871. ReadItems(reader);
  872. reader.ToProtobuf(pb);
  873. pb.MutableQuery()->CopyFrom(Query);
  874. pb.SetCrtTime(TInstant::Now().GetValue());
  875. }
  876. TManager::TManager(TProbeRegistry& registry, bool allowDestructiveActions)
  877. : Registry(registry)
  878. , DestructiveActionsAllowed(allowDestructiveActions)
  879. , SerializingExecutor(new TRunLogShuttleActionExecutor<TCyclicDepot>(0, {}, nullptr, nullptr, nullptr))
  880. {
  881. }
  882. TManager::~TManager() {
  883. for (auto& trace : Traces) {
  884. delete trace.second;
  885. }
  886. }
  887. bool TManager::HasTrace(const TString& id) const {
  888. TGuard<TMutex> g(Mtx);
  889. return Traces.contains(id);
  890. }
  891. const TSession* TManager::GetTrace(const TString& id) const {
  892. TGuard<TMutex> g(Mtx);
  893. TTraces::const_iterator it = Traces.find(id);
  894. if (it == Traces.end()) {
  895. ythrow yexception() << "trace id '" << id << "' is not used";
  896. } else {
  897. return it->second;
  898. }
  899. }
  900. void TManager::New(const TString& id, const TQuery& query) {
  901. TGuard<TMutex> g(Mtx);
  902. if (Traces.find(id) == Traces.end()) {
  903. TSession* trace = new TSession(++LastTraceIdx, Registry, query, GetDestructiveActionsAllowed(), CustomActionFactory);
  904. Traces[id] = trace;
  905. } else {
  906. ythrow yexception() << "trace id '" << id << "' is already used";
  907. }
  908. }
  909. void TManager::Delete(const TString& id) {
  910. TGuard<TMutex> g(Mtx);
  911. TTraces::iterator it = Traces.find(id);
  912. if (it == Traces.end()) {
  913. ythrow yexception() << "trace id '" << id << "' is not used";
  914. } else {
  915. delete it->second;
  916. Traces.erase(it);
  917. }
  918. }
  919. void TManager::Stop(const TString& id) {
  920. TGuard<TMutex> g(Mtx);
  921. TTraces::iterator it = Traces.find(id);
  922. if (it == Traces.end()) {
  923. ythrow yexception() << "trace id '" << id << "' is not used";
  924. } else {
  925. it->second->Detach();
  926. }
  927. }
  928. }