#include "all.h" #include "kill_action.h" #include "log_shuttle.h" #include "preprocessor.h" #include "sleep_action.h" #include "stderr_writer.h" #include "google/protobuf/repeated_field.h" #include #include #include namespace NLWTrace { #ifndef LWTRACE_DISABLE // Define static strings for name of each parameter type #define FOREACH_PARAMTYPE_MACRO(n, t, v) \ const char* TParamType::NameString = n; \ /**/ FOREACH_PARAMTYPE(FOREACH_PARAMTYPE_MACRO) FOR_NIL_PARAMTYPE(FOREACH_PARAMTYPE_MACRO) #undef FOREACH_PARAMTYPE_MACRO #endif void TProbeRegistry::AddProbesList(TProbe** reg) { TGuard g(Mutex); if (reg == nullptr) { return; } for (TProbe** i = reg; *i != nullptr; i++) { AddProbeNoLock(new TStaticBox(*i)); } } void TProbeRegistry::AddProbe(const TBoxPtr& box) { TGuard g(Mutex); AddProbeNoLock(box); } void TProbeRegistry::RemoveProbe(TProbe* probe) { TGuard g(Mutex); RemoveProbeNoLock(probe); } void TProbeRegistry::AddProbeNoLock(const TBoxPtr& box) { TProbe* probe = box->GetProbe(); if (Probes.contains(probe)) { return; // silently skip probe double registration } TIds::key_type key(probe->Event.GetProvider(), probe->Event.Name); Y_ABORT_UNLESS(Ids.count(key) == 0, "duplicate provider:probe pair %s:%s", key.first.data(), key.second.data()); Probes.emplace(probe, box); Ids.insert(key); } void TProbeRegistry::RemoveProbeNoLock(TProbe* probe) { auto iter = Probes.find(probe); if (iter != Probes.end()) { TIds::key_type key(probe->Event.GetProvider(), probe->Event.Name); Ids.erase(key); Probes.erase(iter); } else { // silently skip probe double unregistration } } TAtomic* GetVariablePtr(TSession::TTraceVariables& traceVariables, const TString& name) { TSession::TTraceVariables::iterator it = traceVariables.find(name); if (it == traceVariables.end()) { TAtomicBase zero = 0; traceVariables[name] = zero; return &traceVariables[name]; } return &((*it).second); } typedef enum { OT_LITERAL = 0, OT_PARAMETER = 1, OT_VARIABLE = 2 } EOperandType; template class TOperand; template class TOperand { private: T ImmediateValue; public: TOperand(TSession::TTraceVariables&, const TString&, const TString& value, size_t) { ImmediateValue = TParamConv::FromString(value); } const T& Get(const TParams&) { return ImmediateValue; } }; template class TOperand { private: size_t Idx; public: TOperand(TSession::TTraceVariables&, const TString&, const TString&, size_t idx) { Idx = idx; } const T& Get(const TParams& params) { return params.Param[Idx].template Get(); } }; template class TOperand { private: TAtomic* Variable; public: TOperand(TSession::TTraceVariables& traceVariables, const TString& name, const TString&, size_t) { Variable = GetVariablePtr(traceVariables, name); } const T Get(const TParams&) { return (T)AtomicGet(*Variable); } void Set(const T& value) { AtomicSet(*Variable, value); } void Inc() { AtomicIncrement(*Variable); } void Dec() { AtomicDecrement(*Variable); } void Add(const TAtomicBase value) { AtomicAdd(*Variable, value); } void Sub(const TAtomicBase value) { AtomicSub(*Variable, value); } }; template <> class TOperand { private: TAtomic* Variable; public: TOperand(TSession::TTraceVariables& traceVariables, const TString& name, const TString&, size_t) { Variable = GetVariablePtr(traceVariables, name); } const TCheck Get(const TParams&) { return TCheck(AtomicGet(*Variable)); } void Set(const TCheck& value) { AtomicSet(*Variable, value.Value); } void Add(const TCheck& value) { AtomicAdd(*Variable, value.Value); } void Sub(const TCheck value) { AtomicSub(*Variable, value.Value); } void Inc() { AtomicIncrement(*Variable); } void Dec() { AtomicDecrement(*Variable); } }; template <> class TOperand { private: TString Dummy; public: TOperand(TSession::TTraceVariables&, const TString&, const TString&, size_t) { } const TString Get(const TParams&) { return Dummy; } void Set(const TString&) { } }; template <> class TOperand { private: TSymbol Dummy; public: TOperand(TSession::TTraceVariables&, const TString&, const TString&, size_t) { } const TSymbol Get(const TParams&) { return Dummy; } void Set(const TSymbol&) { } }; // IOperandGetter: hide concrete EOperandType, to save compilation time template struct IOperandGetter { virtual const T Get(const TParams& params) = 0; virtual ~IOperandGetter() { } }; template class TOperandGetter: public IOperandGetter { private: TOperand Op; public: TOperandGetter(const TOperand& op) : Op(op) { } const T Get(const TParams& params) override { return Op.Get(params); } }; template class TReceiver: public TOperand { public: TReceiver(TSession::TTraceVariables& traceVariables, const TString& name) : TOperand(traceVariables, name, {}, 0) { } }; template static bool CmpFunc(TP a, TP b) { return TPredicate()(a, b); } template class TOperatorExecutor: public IExecutor { private: bool InvertCompare; TOperand Lhs; TOperand Rhs; bool DoExecute(TOrbit&, const TParams& params) override { return TFunc()(Lhs.Get(params), Rhs.Get(params)) != InvertCompare; } public: TOperatorExecutor(const TOperand& lhs, const TOperand& rhs, bool invertCompare) : InvertCompare(invertCompare) , Lhs(lhs) , Rhs(rhs) { } }; template struct TAddEq { void operator()(TR& x, TP y) const { x.Add(y); } }; template struct TSubEq { void operator()(TR& x, TP y) const { x.Sub(y); } }; template struct TInc { void operator()(TR& x) const { x.Inc(); } }; template struct TDec { void operator()(TR& x) const { x.Dec(); } }; template class TUnaryInplaceStatementExecutor: public IExecutor { private: TFunc Func; TReceiver Receiver; bool DoExecute(TOrbit&, const TParams&) override { Func(Receiver); return true; } public: TUnaryInplaceStatementExecutor(TReceiver& receiver) : Receiver(receiver) { } }; template class TBinaryInplaceStatementExecutor: public IExecutor { private: TFunc Func; TReceiver Receiver; TOperand Param; bool DoExecute(TOrbit&, const TParams& params) override { Func(Receiver, Param.Get(params)); return true; } public: TBinaryInplaceStatementExecutor(TReceiver& receiver, const TOperand& param) : Receiver(receiver) , Param(param) { } }; template class TBinaryStatementExecutor: public IExecutor { private: TFunc Func; TReceiver Receiver; TOperand FirstParam; bool DoExecute(TOrbit&, const TParams& params) override { Receiver.Set(Func(Receiver.Get(params), FirstParam.Get(params))); return true; } public: TBinaryStatementExecutor(TReceiver& receiver, const TOperand& firstParam) : Receiver(receiver) , FirstParam(firstParam) { } }; template class TTernaryStatementExecutor: public IExecutor { private: TFunc Func; TReceiver Receiver; TAutoPtr> FirstParam; TAutoPtr> SecondParam; bool DoExecute(TOrbit&, const TParams& params) override { Receiver.Set(Func(FirstParam->Get(params), SecondParam->Get(params))); return true; } public: TTernaryStatementExecutor(const TReceiver& receiver, TAutoPtr> firstParam, TAutoPtr> secondParam) : Receiver(receiver) , FirstParam(firstParam) , SecondParam(secondParam) { } }; template class TLogActionExecutor: public IExecutor { private: bool LogParams; bool LogTimestamp; intptr_t* MaxRecords; TAtomic Records; TProbe* Probe; TLog* Log; bool DoExecute(TOrbit&, const TParams& params) override { if (MaxRecords != nullptr) { while (true) { intptr_t a = AtomicGet(Records); if (a >= *MaxRecords) { return true; } if (AtomicCas(&Records, a + 1, a)) { Write(params); return true; } } } else { Write(params); return true; } } void Write(const TParams& params) { typename TLog::TAccessor la(*Log); if (typename TLog::TItem* item = la.Add()) { item->Probe = Probe; if (LogParams) { if ((item->SavedParamsCount = Probe->Event.Signature.ParamCount) > 0) { Probe->Event.Signature.CloneParams(item->Params, params); } } else { item->SavedParamsCount = 0; } if (LogTimestamp) { item->Timestamp = TInstant::Now(); } item->TimestampCycles = GetCycleCount(); } } public: TLogActionExecutor(TProbe* probe, const TLogAction& action, TLog* log) : LogParams(!action.GetDoNotLogParams()) , LogTimestamp(action.GetLogTimestamp()) , MaxRecords(action.GetMaxRecords() ? new intptr_t(action.GetMaxRecords()) : nullptr) , Records(0) , Probe(probe) , Log(log) { } ~TLogActionExecutor() override { delete MaxRecords; } }; class TSamplingExecutor: public IExecutor { private: double SampleRate; public: explicit TSamplingExecutor(double sampleRate) : SampleRate(sampleRate) {} bool DoExecute(TOrbit&, const TParams&) override { return RandomNumber() < SampleRate; } }; typedef struct { EOperandType Type; size_t ParamIdx; } TArgumentDescription; using TArgumentList = TVector; template void ParseArguments(const T& op, const TSignature& signature, const TString& exceptionPrefix, size_t expectedArgumentCount, TArgumentList& arguments) { arguments.clear(); size_t firstParamIdx = size_t(-1); for (size_t argumentIdx = 0; argumentIdx < op.ArgumentSize(); ++argumentIdx) { const TArgument& arg = op.GetArgument(argumentIdx); TArgumentDescription operand; operand.ParamIdx = size_t(-1); if (arg.GetVariable()) { operand.Type = OT_VARIABLE; } else if (arg.GetValue()) { operand.Type = OT_LITERAL; } else if (arg.GetParam()) { operand.Type = OT_PARAMETER; operand.ParamIdx = signature.FindParamIndex(arg.GetParam()); if (operand.ParamIdx == size_t(-1)) { ythrow yexception() << exceptionPrefix << " argument #" << argumentIdx << " param '" << arg.GetParam() << "' doesn't exist"; } if (firstParamIdx == size_t(-1)) { firstParamIdx = operand.ParamIdx; } else { if (strcmp(signature.ParamTypes[firstParamIdx], signature.ParamTypes[operand.ParamIdx]) != 0) { ythrow yexception() << exceptionPrefix << " param types do not match"; } } } else { ythrow yexception() << exceptionPrefix << " argument #" << argumentIdx << " is empty"; } arguments.push_back(operand); } if (arguments.size() != expectedArgumentCount) { ythrow yexception() << exceptionPrefix << " incorrect number of arguments (" << arguments.size() << " present, " << expectedArgumentCount << " expected)"; } } template struct TTraceSecondArg { // implementation of deprecated std::project2nd TArg1 operator()(const TArg1&, const TArg2& y) const { return y; } }; void TSession::InsertExecutor( TTraceVariables& traceVariables, size_t bi, const TPredicate* pred, const NProtoBuf::RepeatedPtrField& actions, TProbe* probe, const bool destructiveActionsAllowed, const TCustomActionFactory& customActionFactory) { #ifndef LWTRACE_DISABLE THolder exec; IExecutor* last = nullptr; TArgumentList arguments; if (pred) { double sampleRate = pred->GetSampleRate(); if (sampleRate != 0.0) { if (!(0.0 < sampleRate && sampleRate <= 1.0)) { ythrow yexception() << "probe '" << probe->Event.Name << "' block #" << bi + 1 << " sampling operator" << " invalid sample rate " << sampleRate << ", expected [0;1]"; } exec.Reset(new TSamplingExecutor(sampleRate)); last = exec.Get(); } for (size_t i = 0; i < pred->OperatorsSize(); i++) { const TOperator& op = pred->GetOperators(i); TString exceptionPrefix; TStringOutput exceptionPrefixOutput(exceptionPrefix); exceptionPrefixOutput << "probe '" << probe->Event.Name << "' block #" << bi + 1 << " operator #" << i + 1; ParseArguments(op, probe->Event.Signature, exceptionPrefix, 2, arguments); THolder opExec; TArgumentDescription arg0 = arguments.at(0); TArgumentDescription arg1 = arguments.at(1); const char* tName0 = arg0.ParamIdx == size_t(-1) ? nullptr : probe->Event.Signature.ParamTypes[arg0.ParamIdx]; const char* tName1 = arg1.ParamIdx == size_t(-1) ? nullptr : probe->Event.Signature.ParamTypes[arg1.ParamIdx]; TString var0 = op.GetArgument(0).GetVariable(); TString var1 = op.GetArgument(1).GetVariable(); TString val0 = op.GetArgument(0).GetValue(); TString val1 = op.GetArgument(1).GetValue(); #define FOREACH_OPERAND_TYPE_RT(n, t, v, fn, lt, rt) \ if (rt == arg1.Type) { \ TOperand rhs(traceVariables, var1, val1, arg1.ParamIdx); \ opExec.Reset(new TOperatorExecutor, lt, rt>(lhs, rhs, invertCompare)); \ break; \ } #define FOREACH_OPERAND_TYPE_LT(n, t, v, fn, lt) \ if (lt == arg0.Type) { \ TOperand lhs(traceVariables, var0, val0, arg0.ParamIdx); \ FOREACH_RIGHT_TYPE(FOREACH_OPERAND_TYPE_RT, n, t, v, fn, lt) \ } #define FOREACH_PARAMTYPE_MACRO(n, t, v, fn) \ if ((arg0.ParamIdx == size_t(-1) || strcmp(tName0, n) == 0) && (arg1.ParamIdx == size_t(-1) || strcmp(tName1, n) == 0)) { \ FOREACH_LEFT_TYPE(FOREACH_OPERAND_TYPE_LT, n, t, v, fn); \ } bool invertCompare = EqualToOneOf(op.GetType(), OT_NE, OT_GE, OT_LE); switch (op.GetType()) { case OT_EQ: case OT_NE: FOREACH_PARAMTYPE(FOREACH_PARAMTYPE_MACRO, std::equal_to); break; case OT_LT: case OT_GE: FOREACH_PARAMTYPE(FOREACH_PARAMTYPE_MACRO, std::less); break; case OT_GT: case OT_LE: FOREACH_PARAMTYPE(FOREACH_PARAMTYPE_MACRO, std::greater); break; default: ythrow yexception() << exceptionPrefix << " has not supported operator type #" << int(op.GetType()); } #undef FOREACH_OPERAND_TYPE_RT #undef FOREACH_OPERAND_TYPE_LT #undef FOREACH_PARAMTYPE_MACRO if (!opExec) { ythrow yexception() << exceptionPrefix << " has not supported left param #" << arg0.ParamIdx + 1 << " type '" << (arg0.ParamIdx != size_t(-1) ? probe->Event.Signature.ParamTypes[arg0.ParamIdx] : "?") << "', or right param #" << arg0.ParamIdx + 1 << " type '" << (arg1.ParamIdx != size_t(-1) ? probe->Event.Signature.ParamTypes[arg1.ParamIdx] : "?") << "'"; } if (!exec) { exec.Reset(opExec.Release()); last = exec.Get(); } else { last->SetNext(opExec.Release()); last = last->GetNext(); } } } for (int i = 0; i < actions.size(); ++i) { const TAction& action = actions.Get(i); THolder actExec; if (action.HasPrintToStderrAction()) { actExec.Reset(new TStderrActionExecutor(probe)); } else if (action.HasLogAction()) { if (Query.GetLogDurationUs()) { actExec.Reset(new TLogActionExecutor(probe, action.GetLogAction(), &DurationLog)); } else { actExec.Reset(new TLogActionExecutor(probe, action.GetLogAction(), &CyclicLog)); } } else if (action.HasRunLogShuttleAction()) { if (Query.GetLogDurationUs()) { actExec.Reset(new TRunLogShuttleActionExecutor(TraceIdx, action.GetRunLogShuttleAction(), &DurationDepot, &LastTrackId, &LastSpanId)); } else { actExec.Reset(new TRunLogShuttleActionExecutor(TraceIdx, action.GetRunLogShuttleAction(), &CyclicDepot, &LastTrackId, &LastSpanId)); } } else if (action.HasEditLogShuttleAction()) { if (Query.GetLogDurationUs()) { actExec.Reset(new TEditLogShuttleActionExecutor(TraceIdx, action.GetEditLogShuttleAction())); } else { actExec.Reset(new TEditLogShuttleActionExecutor(TraceIdx, action.GetEditLogShuttleAction())); } } else if (action.HasDropLogShuttleAction()) { if (Query.GetLogDurationUs()) { actExec.Reset(new TDropLogShuttleActionExecutor(TraceIdx, action.GetDropLogShuttleAction())); } else { actExec.Reset(new TDropLogShuttleActionExecutor(TraceIdx, action.GetDropLogShuttleAction())); } } else if (action.HasCustomAction()) { THolder customExec(customActionFactory.Create(probe, action.GetCustomAction(), this)); if (customExec) { if (!customExec->IsDestructive() || destructiveActionsAllowed) { actExec.Reset(customExec.Release()); } else { ythrow yexception() << "probe '" << probe->Event.Name << "block #" << bi + 1 << " action #" << i + 1 << " contains destructive CustomAction, but destructive actions are disabled." << " Please, consider using --unsafe-lwtrace command line parameter."; } } else { ythrow yexception() << "probe '" << probe->Event.Name << "block #" << bi + 1 << " action #" << i + 1 << " contains unregistered CustomAction '" << action.GetCustomAction().GetName() << "'"; } } else if (action.HasKillAction()) { if (destructiveActionsAllowed) { actExec.Reset(new NPrivate::TKillActionExecutor(probe)); } else { ythrow yexception() << "probe '" << probe->Event.Name << "block #" << bi + 1 << " action #" << i + 1 << " contains destructive KillAction, but destructive actions are disabled." << " Please, consider using --unsafe-lwtrace command line parameter."; } } else if (action.HasSleepAction()) { if (destructiveActionsAllowed) { const TSleepAction& sleepAction = action.GetSleepAction(); if (sleepAction.GetNanoSeconds()) { ui64 nanoSeconds = sleepAction.GetNanoSeconds(); actExec.Reset(new NPrivate::TSleepActionExecutor(probe, nanoSeconds)); } else { ythrow yexception() << "probe '" << probe->Event.Name << "block #" << bi + 1 << " action #" << i + 1 << " SleepAction missing parameter 'NanoSeconds'"; } } else { ythrow yexception() << "probe '" << probe->Event.Name << "block #" << bi + 1 << " action #" << i + 1 << " contains destructive SleepAction, but destructive actions are disabled." << " Please, consider using --unsafe-lwtrace command line parameter."; } } else if (action.HasStatementAction()) { const TStatementAction& statement = action.GetStatementAction(); TString exceptionPrefix; TStringOutput exceptionPrefixOutput(exceptionPrefix); exceptionPrefixOutput << "probe '" << probe->Event.Name << "' block #" << bi + 1 << " action #" << i + 1; size_t expectedArgumentCount = 3; if (statement.GetType() == ST_MOV || statement.GetType() == ST_ADD_EQ || statement.GetType() == ST_SUB_EQ) { expectedArgumentCount = 2; } else if (statement.GetType() == ST_INC || statement.GetType() == ST_DEC) { expectedArgumentCount = 1; } ParseArguments(statement, probe->Event.Signature, exceptionPrefix, expectedArgumentCount, arguments); TArgumentDescription arg0 = (expectedArgumentCount <= 0) ? TArgumentDescription() : arguments.at(0); TArgumentDescription arg1 = (expectedArgumentCount <= 1) ? TArgumentDescription() : arguments.at(1); TArgumentDescription arg2 = (expectedArgumentCount <= 2) ? TArgumentDescription() : arguments.at(2); TString var0 = (expectedArgumentCount <= 0) ? "" : statement.GetArgument(0).GetVariable(); TString var1 = (expectedArgumentCount <= 1) ? "" : statement.GetArgument(1).GetVariable(); TString var2 = (expectedArgumentCount <= 2) ? "" : statement.GetArgument(2).GetVariable(); TString val0 = (expectedArgumentCount <= 0) ? "" : statement.GetArgument(0).GetValue(); TString val1 = (expectedArgumentCount <= 1) ? "" : statement.GetArgument(1).GetValue(); TString val2 = (expectedArgumentCount <= 2) ? "" : statement.GetArgument(2).GetValue(); const char* tName1 = (expectedArgumentCount <= 1 || arg1.ParamIdx == size_t(-1)) ? nullptr : probe->Event.Signature.ParamTypes[arg1.ParamIdx]; const char* tName2 = (expectedArgumentCount <= 2 || arg2.ParamIdx == size_t(-1)) ? nullptr : probe->Event.Signature.ParamTypes[arg2.ParamIdx]; if (arg0.Type == OT_VARIABLE) { switch (statement.GetType()) { #define PARSE_UNARY_INPLACE_STATEMENT_MACRO(n, t, v, fn) \ { \ typedef TUnaryInplaceStatementExecutor>> TExec; \ TReceiver receiver(traceVariables, var0); \ actExec.Reset(new TExec(receiver)); \ break; \ } #define PARSE_BINARY_INPLACE_STATEMENT_MACRO2(n, t, v, fn, ft) \ if (arg1.Type == ft) { \ typedef TBinaryInplaceStatementExecutor, t>, ft> TExec; \ TOperand firstParam(traceVariables, var1, val1, arg1.ParamIdx); \ actExec.Reset(new TExec(receiver, firstParam)); \ break; \ } #define PARSE_BINARY_INPLACE_STATEMENT_MACRO(n, t, v, fn) \ if (arg1.ParamIdx == size_t(-1) || strcmp(tName1, n) == 0) { \ TReceiver receiver(traceVariables, var0); \ FOREACH_RIGHT_TYPE(PARSE_BINARY_INPLACE_STATEMENT_MACRO2, n, t, v, fn); \ } #define PARSE_BINARY_STATEMENT_MACRO2(n, t, v, fn, ft) \ if (arg1.Type == ft) { \ typedef TBinaryStatementExecutor, ft> TExec; \ TOperand firstParam(traceVariables, var1, val1, arg1.ParamIdx); \ actExec.Reset(new TExec(receiver, firstParam)); \ break; \ } #define PARSE_BINARY_STATEMENT_MACRO(n, t, v, fn) \ if (arg1.ParamIdx == size_t(-1) || strcmp(tName1, n) == 0) { \ TReceiver receiver(traceVariables, var0); \ FOREACH_RIGHT_TYPE(PARSE_BINARY_STATEMENT_MACRO2, n, t, v, fn); \ } #define CREATE_OPERAND_GETTER_N(N, type, arg_type) \ if (arg##N.Type == arg_type) { \ operand##N.Reset(new TOperandGetter(TOperand(traceVariables, var##N, val##N, arg##N.ParamIdx))); \ } #define TERNARY_ON_TYPE(n, t, v, fn) \ if ((arg1.ParamIdx == size_t(-1) || strcmp(tName1, n) == 0) && (arg2.ParamIdx == size_t(-1) || strcmp(tName2, n) == 0)) { \ TAutoPtr> operand1, operand2; \ FOREACH_LEFT_TYPE(CREATE_OPERAND_GETTER_N, 1, t); \ FOREACH_RIGHT_TYPE(CREATE_OPERAND_GETTER_N, 2, t); \ if (operand1 && operand2) { \ actExec.Reset(new TTernaryStatementExecutor>( \ TReceiver(traceVariables, var0), \ operand1, \ operand2)); \ } \ break; \ } #define IMPLEMENT_TERNARY_STATEMENT(fn) FOR_MATH_PARAMTYPE(TERNARY_ON_TYPE, fn) case ST_INC: FOR_MATH_PARAMTYPE(PARSE_UNARY_INPLACE_STATEMENT_MACRO, TInc); break; case ST_DEC: FOR_MATH_PARAMTYPE(PARSE_UNARY_INPLACE_STATEMENT_MACRO, TDec); break; case ST_MOV: FOR_MATH_PARAMTYPE(PARSE_BINARY_STATEMENT_MACRO, TTraceSecondArg); break; case ST_ADD_EQ: FOR_MATH_PARAMTYPE(PARSE_BINARY_INPLACE_STATEMENT_MACRO, TAddEq); break; case ST_SUB_EQ: FOR_MATH_PARAMTYPE(PARSE_BINARY_INPLACE_STATEMENT_MACRO, TSubEq); break; case ST_ADD: IMPLEMENT_TERNARY_STATEMENT(std::plus) break; case ST_SUB: IMPLEMENT_TERNARY_STATEMENT(std::minus) break; case ST_MUL: IMPLEMENT_TERNARY_STATEMENT(std::multiplies) break; case ST_DIV: IMPLEMENT_TERNARY_STATEMENT(std::divides) break; case ST_MOD: IMPLEMENT_TERNARY_STATEMENT(std::modulus) break; default: ythrow yexception() << "block #" << bi + 1 << " action #" << i + 1 << " has not supported statement type #" << int(statement.GetType()); } } if (!actExec) { ythrow yexception() << "block #" << bi + 1 << " action #" << i + 1 << " can't create action"; } #undef CREATE_OPERAND_GETTER_N #undef TERNARY_ON_TYPE #undef IMPLEMENT_TERNARY_STATEMENT #undef PARSE_TERNARY_STATEMENT_MACRO #undef PARSE_BINARY_STATEMENT_MACRO #undef PARSE_BINARY_INPLACE_STATEMENT_MACRO #undef PARSE_UNARY_INPLACE_STATEMENT_MACRO } else { ythrow yexception() << "block #" << bi + 1 << " action #" << i + 1 << " has not supported action '" << action.ShortDebugString() << "'"; } if (!exec) { exec.Reset(actExec.Release()); last = exec.Get(); } else { last->SetNext(actExec.Release()); last = last->GetNext(); } } if (!probe->Attach(exec.Get())) { ythrow yexception() << "block #" << bi + 1 << " cannot be attached to probe '" << probe->Event.Name << "': no free slots"; } Probes.push_back(std::make_pair(probe, exec.Release())); #else Y_UNUSED(bi); Y_UNUSED(pred); Y_UNUSED(actions); Y_UNUSED(probe); Y_UNUSED(destructiveActionsAllowed); Y_UNUSED(traceVariables); Y_UNUSED(customActionFactory); #endif } TSession::TSession(ui64 traceIdx, TProbeRegistry& registry, const TQuery& query, const bool destructiveActionsAllowed, const TCustomActionFactory& customActionFactory) : StartTime(TInstant::Now()) , TraceIdx(traceIdx) , Registry(registry) , StoreDuration(TDuration::MicroSeconds(query.GetLogDurationUs() * 11 / 10)) // +10% to try avoid truncation while reading multiple threads/traces , ReadDuration(TDuration::MicroSeconds(query.GetLogDurationUs())) , CyclicLog(query.GetPerThreadLogSize() ? query.GetPerThreadLogSize() : 1000) , DurationLog(StoreDuration) , CyclicDepot(query.GetPerThreadLogSize() ? query.GetPerThreadLogSize() : 1000) , DurationDepot(StoreDuration) , LastTrackId(0) , LastSpanId(0) , Attached(true) , Query(query) { try { for (size_t bi = 0; bi < query.BlocksSize(); bi++) { const TBlock& block = query.GetBlocks(bi); if (!block.HasProbeDesc()) { ythrow yexception() << "block #" << bi + 1 << " has no probe description"; } const TProbeDesc& pdesc = block.GetProbeDesc(); const TPredicate* pred = block.HasPredicate() ? &block.GetPredicate() : nullptr; if (block.ActionSize() < 1) { ythrow yexception() << "block #" << bi + 1 << " has no action"; } const NProtoBuf::RepeatedPtrField& actions = block.action(); if (pdesc.GetName() && pdesc.GetProvider()) { TProbeRegistry::TProbesAccessor probes(Registry); bool found = false; for (auto& kv : probes) { TProbe* probe = kv.first; if (probe->Event.Name == pdesc.GetName() && probe->Event.GetProvider() == pdesc.GetProvider()) { InsertExecutor(TraceVariables, bi, pred, actions, probe, destructiveActionsAllowed, customActionFactory); found = true; break; } } if (!found) { ythrow yexception() << "block #" << bi + 1 << " has no matching probe with name '" << pdesc.GetName() << "' provider '" << pdesc.GetProvider() << "'"; } } else if (pdesc.GetGroup()) { bool found = false; TProbeRegistry::TProbesAccessor probes(Registry); for (auto& kv : probes) { TProbe* probe = kv.first; for (const char* const* gi = probe->Event.Groups; *gi != nullptr; gi++) { if (*gi == pdesc.GetGroup()) { InsertExecutor(TraceVariables, bi, pred, actions, probe, destructiveActionsAllowed, customActionFactory); found = true; break; } } } if (!found) { ythrow yexception() << "block #" << bi + 1 << " has no matching probes for group '" << pdesc.GetGroup() << "'"; } } else { ythrow yexception() << "block #" << bi + 1 << " has bad probe description: name '" << pdesc.GetName() << "' provider '" << pdesc.GetProvider() << "' group '" << pdesc.GetGroup() << "'"; } } } catch (...) { Destroy(); throw; } } void TSession::Destroy() { Detach(); for (auto& probe : Probes) { delete probe.second; } } TSession::~TSession() { Destroy(); } void TSession::Detach() { if (Attached) { for (auto& p : Probes) { TProbe* probe = p.first; IExecutor* exec = p.second; probe->Detach(exec); } Attached = false; } } size_t TSession::GetEventsCount() const { return CyclicLog.GetEventsCount() + DurationLog.GetEventsCount() + CyclicDepot.GetEventsCount() + DurationDepot.GetEventsCount(); } size_t TSession::GetThreadsCount() const { return CyclicLog.GetThreadsCount() + DurationLog.GetThreadsCount() + CyclicDepot.GetThreadsCount() + DurationDepot.GetThreadsCount(); } class TReadToProtobuf { private: TMap> Items; public: void ToProtobuf(TLogPb& pb) const { TSet probes; ui64 eventsCount = 0; for (auto kv : Items) { TThreadLogPb* tpb = pb.AddThreadLogs(); tpb->SetThreadId(kv.first); for (TLogItem& item : kv.second) { item.ToProtobuf(*tpb->AddLogItems()); probes.insert(item.Probe); eventsCount++; } } pb.SetEventsCount(eventsCount); for (TProbe* probe : probes) { probe->Event.ToProtobuf(*pb.AddEvents()); } } void Push(TThread::TId tid, const TLogItem& item) { // Avoid any expansive operations in Push(), because it executes under lock and blocks program being traced Items[tid].push_back(item); } }; void TSession::ToProtobuf(TLogPb& pb) const { TReadToProtobuf reader; ReadItems(reader); reader.ToProtobuf(pb); pb.MutableQuery()->CopyFrom(Query); pb.SetCrtTime(TInstant::Now().GetValue()); } TManager::TManager(TProbeRegistry& registry, bool allowDestructiveActions) : Registry(registry) , DestructiveActionsAllowed(allowDestructiveActions) , SerializingExecutor(new TRunLogShuttleActionExecutor(0, {}, nullptr, nullptr, nullptr)) { } TManager::~TManager() { for (auto& trace : Traces) { delete trace.second; } } bool TManager::HasTrace(const TString& id) const { TGuard g(Mtx); return Traces.contains(id); } const TSession* TManager::GetTrace(const TString& id) const { TGuard g(Mtx); TTraces::const_iterator it = Traces.find(id); if (it == Traces.end()) { ythrow yexception() << "trace id '" << id << "' is not used"; } else { return it->second; } } void TManager::New(const TString& id, const TQuery& query) { TGuard g(Mtx); if (Traces.find(id) == Traces.end()) { TSession* trace = new TSession(++LastTraceIdx, Registry, query, GetDestructiveActionsAllowed(), CustomActionFactory); Traces[id] = trace; } else { ythrow yexception() << "trace id '" << id << "' is already used"; } } void TManager::Delete(const TString& id) { TGuard g(Mtx); TTraces::iterator it = Traces.find(id); if (it == Traces.end()) { ythrow yexception() << "trace id '" << id << "' is not used"; } else { delete it->second; Traces.erase(it); } } void TManager::Stop(const TString& id) { TGuard g(Mtx); TTraces::iterator it = Traces.find(id); if (it == Traces.end()) { ythrow yexception() << "trace id '" << id << "' is not used"; } else { it->second->Detach(); } } }