1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051 |
- #include "all.h"
- #include "kill_action.h"
- #include "log_shuttle.h"
- #include "preprocessor.h"
- #include "sleep_action.h"
- #include "stderr_writer.h"
- #include "google/protobuf/repeated_field.h"
- #include <util/generic/map.h>
- #include <util/random/random.h>
- #include <functional>
- namespace NLWTrace {
- #ifndef LWTRACE_DISABLE
- // Define static strings for name of each parameter type
- #define FOREACH_PARAMTYPE_MACRO(n, t, v) \
- const char* TParamType<t>::NameString = n; \
- /**/
- FOREACH_PARAMTYPE(FOREACH_PARAMTYPE_MACRO)
- FOR_NIL_PARAMTYPE(FOREACH_PARAMTYPE_MACRO)
- #undef FOREACH_PARAMTYPE_MACRO
- #endif
- void TProbeRegistry::AddProbesList(TProbe** reg) {
- TGuard<TMutex> g(Mutex);
- if (reg == nullptr) {
- return;
- }
- for (TProbe** i = reg; *i != nullptr; i++) {
- AddProbeNoLock(new TStaticBox(*i));
- }
- }
- void TProbeRegistry::AddProbe(const TBoxPtr& box) {
- TGuard<TMutex> g(Mutex);
- AddProbeNoLock(box);
- }
- void TProbeRegistry::RemoveProbe(TProbe* probe) {
- TGuard<TMutex> g(Mutex);
- RemoveProbeNoLock(probe);
- }
- void TProbeRegistry::AddProbeNoLock(const TBoxPtr& box) {
- TProbe* probe = box->GetProbe();
- if (Probes.contains(probe)) {
- return; // silently skip probe double registration
- }
- TIds::key_type key(probe->Event.GetProvider(), probe->Event.Name);
- Y_ABORT_UNLESS(Ids.count(key) == 0, "duplicate provider:probe pair %s:%s", key.first.data(), key.second.data());
- Probes.emplace(probe, box);
- Ids.insert(key);
- }
- void TProbeRegistry::RemoveProbeNoLock(TProbe* probe) {
- auto iter = Probes.find(probe);
- if (iter != Probes.end()) {
- TIds::key_type key(probe->Event.GetProvider(), probe->Event.Name);
- Ids.erase(key);
- Probes.erase(iter);
- } else {
- // silently skip probe double unregistration
- }
- }
- TAtomic* GetVariablePtr(TSession::TTraceVariables& traceVariables, const TString& name) {
- TSession::TTraceVariables::iterator it = traceVariables.find(name);
- if (it == traceVariables.end()) {
- TAtomicBase zero = 0;
- traceVariables[name] = zero;
- return &traceVariables[name];
- }
- return &((*it).second);
- }
- typedef enum {
- OT_LITERAL = 0,
- OT_PARAMETER = 1,
- OT_VARIABLE = 2
- } EOperandType;
- template <class T, EOperandType>
- class TOperand;
- template <class T>
- class TOperand<T, OT_LITERAL> {
- private:
- T ImmediateValue;
- public:
- TOperand(TSession::TTraceVariables&, const TString&, const TString& value, size_t) {
- ImmediateValue = TParamConv<T>::FromString(value);
- }
- const T& Get(const TParams&) {
- return ImmediateValue;
- }
- };
- template <class T>
- class TOperand<T, OT_PARAMETER> {
- private:
- size_t Idx;
- public:
- TOperand(TSession::TTraceVariables&, const TString&, const TString&, size_t idx) {
- Idx = idx;
- }
- const T& Get(const TParams& params) {
- return params.Param[Idx].template Get<T>();
- }
- };
- template <class T>
- class TOperand<T, OT_VARIABLE> {
- private:
- TAtomic* Variable;
- public:
- TOperand(TSession::TTraceVariables& traceVariables, const TString& name, const TString&, size_t) {
- Variable = GetVariablePtr(traceVariables, name);
- }
- const T Get(const TParams&) {
- return (T)AtomicGet(*Variable);
- }
- void Set(const T& value) {
- AtomicSet(*Variable, value);
- }
- void Inc() {
- AtomicIncrement(*Variable);
- }
- void Dec() {
- AtomicDecrement(*Variable);
- }
- void Add(const TAtomicBase value) {
- AtomicAdd(*Variable, value);
- }
- void Sub(const TAtomicBase value) {
- AtomicSub(*Variable, value);
- }
- };
- template <>
- class TOperand<TCheck, OT_VARIABLE> {
- private:
- TAtomic* Variable;
- public:
- TOperand(TSession::TTraceVariables& traceVariables, const TString& name, const TString&, size_t) {
- Variable = GetVariablePtr(traceVariables, name);
- }
- const TCheck Get(const TParams&) {
- return TCheck(AtomicGet(*Variable));
- }
- void Set(const TCheck& value) {
- AtomicSet(*Variable, value.Value);
- }
- void Add(const TCheck& value) {
- AtomicAdd(*Variable, value.Value);
- }
- void Sub(const TCheck value) {
- AtomicSub(*Variable, value.Value);
- }
- void Inc() {
- AtomicIncrement(*Variable);
- }
- void Dec() {
- AtomicDecrement(*Variable);
- }
- };
- template <>
- class TOperand<TString, OT_VARIABLE> {
- private:
- TString Dummy;
- public:
- TOperand(TSession::TTraceVariables&, const TString&, const TString&, size_t) {
- }
- const TString Get(const TParams&) {
- return Dummy;
- }
- void Set(const TString&) {
- }
- };
- template <>
- class TOperand<TSymbol, OT_VARIABLE> {
- private:
- TSymbol Dummy;
- public:
- TOperand(TSession::TTraceVariables&, const TString&, const TString&, size_t) {
- }
- const TSymbol Get(const TParams&) {
- return Dummy;
- }
- void Set(const TSymbol&) {
- }
- };
- // IOperandGetter: hide concrete EOperandType, to save compilation time
- template <class T>
- struct IOperandGetter {
- virtual const T Get(const TParams& params) = 0;
- virtual ~IOperandGetter() {
- }
- };
- template <class T, EOperandType TParam>
- class TOperandGetter: public IOperandGetter<T> {
- private:
- TOperand<T, TParam> Op;
- public:
- TOperandGetter(const TOperand<T, TParam>& op)
- : Op(op)
- {
- }
- const T Get(const TParams& params) override {
- return Op.Get(params);
- }
- };
- template <class T>
- class TReceiver: public TOperand<T, OT_VARIABLE> {
- public:
- TReceiver(TSession::TTraceVariables& traceVariables, const TString& name)
- : TOperand<T, OT_VARIABLE>(traceVariables, name, nullptr, 0)
- {
- }
- };
- template <class TP, class TPredicate>
- static bool CmpFunc(TP a, TP b) {
- return TPredicate()(a, b);
- }
- template <class TP, class TFunc, EOperandType TLhs, EOperandType TRhs>
- class TOperatorExecutor: public IExecutor {
- private:
- bool InvertCompare;
- TOperand<TP, TLhs> Lhs;
- TOperand<TP, TRhs> Rhs;
- bool DoExecute(TOrbit&, const TParams& params) override {
- return TFunc()(Lhs.Get(params), Rhs.Get(params)) != InvertCompare;
- }
- public:
- TOperatorExecutor(const TOperand<TP, TLhs>& lhs, const TOperand<TP, TRhs>& rhs, bool invertCompare)
- : InvertCompare(invertCompare)
- , Lhs(lhs)
- , Rhs(rhs)
- {
- }
- };
- template <class TR, class TP>
- struct TAddEq {
- void operator()(TR& x, TP y) const {
- x.Add(y);
- }
- };
- template <class TR, class TP>
- struct TSubEq {
- void operator()(TR& x, TP y) const {
- x.Sub(y);
- }
- };
- template <class TR>
- struct TInc {
- void operator()(TR& x) const {
- x.Inc();
- }
- };
- template <class TR>
- struct TDec {
- void operator()(TR& x) const {
- x.Dec();
- }
- };
- template <class TP, class TFunc>
- class TUnaryInplaceStatementExecutor: public IExecutor {
- private:
- TFunc Func;
- TReceiver<TP> Receiver;
- bool DoExecute(TOrbit&, const TParams&) override {
- Func(Receiver);
- return true;
- }
- public:
- TUnaryInplaceStatementExecutor(TReceiver<TP>& receiver)
- : Receiver(receiver)
- {
- }
- };
- template <class TP, class TFunc, EOperandType TParam>
- class TBinaryInplaceStatementExecutor: public IExecutor {
- private:
- TFunc Func;
- TReceiver<TP> Receiver;
- TOperand<TP, TParam> Param;
- bool DoExecute(TOrbit&, const TParams& params) override {
- Func(Receiver, Param.Get(params));
- return true;
- }
- public:
- TBinaryInplaceStatementExecutor(TReceiver<TP>& receiver, const TOperand<TP, TParam>& param)
- : Receiver(receiver)
- , Param(param)
- {
- }
- };
- template <class TP, class TFunc, EOperandType TFirstParam>
- class TBinaryStatementExecutor: public IExecutor {
- private:
- TFunc Func;
- TReceiver<TP> Receiver;
- TOperand<TP, TFirstParam> FirstParam;
- bool DoExecute(TOrbit&, const TParams& params) override {
- Receiver.Set(Func(Receiver.Get(params), FirstParam.Get(params)));
- return true;
- }
- public:
- TBinaryStatementExecutor(TReceiver<TP>& receiver, const TOperand<TP, TFirstParam>& firstParam)
- : Receiver(receiver)
- , FirstParam(firstParam)
- {
- }
- };
- template <class TP, class TFunc>
- class TTernaryStatementExecutor: public IExecutor {
- private:
- TFunc Func;
- TReceiver<TP> Receiver;
- TAutoPtr<IOperandGetter<TP>> FirstParam;
- TAutoPtr<IOperandGetter<TP>> SecondParam;
- bool DoExecute(TOrbit&, const TParams& params) override {
- Receiver.Set(Func(FirstParam->Get(params), SecondParam->Get(params)));
- return true;
- }
- public:
- TTernaryStatementExecutor(const TReceiver<TP>& receiver,
- TAutoPtr<IOperandGetter<TP>> firstParam,
- TAutoPtr<IOperandGetter<TP>> secondParam)
- : Receiver(receiver)
- , FirstParam(firstParam)
- , SecondParam(secondParam)
- {
- }
- };
- template <class TLog>
- class TLogActionExecutor: public IExecutor {
- private:
- bool LogParams;
- bool LogTimestamp;
- intptr_t* MaxRecords;
- TAtomic Records;
- TProbe* Probe;
- TLog* Log;
- bool DoExecute(TOrbit&, const TParams& params) override {
- if (MaxRecords != nullptr) {
- while (true) {
- intptr_t a = AtomicGet(Records);
- if (a >= *MaxRecords) {
- return true;
- }
- if (AtomicCas(&Records, a + 1, a)) {
- Write(params);
- return true;
- }
- }
- } else {
- Write(params);
- return true;
- }
- }
- void Write(const TParams& params) {
- typename TLog::TAccessor la(*Log);
- if (typename TLog::TItem* item = la.Add()) {
- item->Probe = Probe;
- if (LogParams) {
- if ((item->SavedParamsCount = Probe->Event.Signature.ParamCount) > 0) {
- Probe->Event.Signature.CloneParams(item->Params, params);
- }
- } else {
- item->SavedParamsCount = 0;
- }
- if (LogTimestamp) {
- item->Timestamp = TInstant::Now();
- }
- item->TimestampCycles = GetCycleCount();
- }
- }
- public:
- TLogActionExecutor(TProbe* probe, const TLogAction& action, TLog* log)
- : LogParams(!action.GetDoNotLogParams())
- , LogTimestamp(action.GetLogTimestamp())
- , MaxRecords(action.GetMaxRecords() ? new intptr_t(action.GetMaxRecords()) : nullptr)
- , Records(0)
- , Probe(probe)
- , Log(log)
- {
- }
- ~TLogActionExecutor() override {
- delete MaxRecords;
- }
- };
- class TSamplingExecutor: public IExecutor {
- private:
- double SampleRate;
- public:
- explicit TSamplingExecutor(double sampleRate)
- : SampleRate(sampleRate)
- {}
- bool DoExecute(TOrbit&, const TParams&) override {
- return RandomNumber<double>() < SampleRate;
- }
- };
- typedef struct {
- EOperandType Type;
- size_t ParamIdx;
- } TArgumentDescription;
- using TArgumentList = TVector<TArgumentDescription>;
- template <class T>
- void ParseArguments(const T& op, const TSignature& signature, const TString& exceptionPrefix, size_t expectedArgumentCount, TArgumentList& arguments) {
- arguments.clear();
- size_t firstParamIdx = size_t(-1);
- for (size_t argumentIdx = 0; argumentIdx < op.ArgumentSize(); ++argumentIdx) {
- const TArgument& arg = op.GetArgument(argumentIdx);
- TArgumentDescription operand;
- operand.ParamIdx = size_t(-1);
- if (arg.GetVariable()) {
- operand.Type = OT_VARIABLE;
- } else if (arg.GetValue()) {
- operand.Type = OT_LITERAL;
- } else if (arg.GetParam()) {
- operand.Type = OT_PARAMETER;
- operand.ParamIdx = signature.FindParamIndex(arg.GetParam());
- if (operand.ParamIdx == size_t(-1)) {
- ythrow yexception() << exceptionPrefix
- << " argument #" << argumentIdx << " param '" << arg.GetParam()
- << "' doesn't exist";
- }
- if (firstParamIdx == size_t(-1)) {
- firstParamIdx = operand.ParamIdx;
- } else {
- if (strcmp(signature.ParamTypes[firstParamIdx], signature.ParamTypes[operand.ParamIdx]) != 0) {
- ythrow yexception() << exceptionPrefix
- << " param types do not match";
- }
- }
- } else {
- ythrow yexception() << exceptionPrefix
- << " argument #" << argumentIdx
- << " is empty";
- }
- arguments.push_back(operand);
- }
- if (arguments.size() != expectedArgumentCount) {
- ythrow yexception() << exceptionPrefix
- << " incorrect number of arguments (" << arguments.size()
- << " present, " << expectedArgumentCount << " expected)";
- }
- }
- template <class TArg1, class TArg2>
- struct TTraceSecondArg {
- // implementation of deprecated std::project2nd
- TArg1 operator()(const TArg1&, const TArg2& y) const {
- return y;
- }
- };
- void TSession::InsertExecutor(
- TTraceVariables& traceVariables, size_t bi, const TPredicate* pred,
- const NProtoBuf::RepeatedPtrField<TAction>& actions, TProbe* probe,
- const bool destructiveActionsAllowed,
- const TCustomActionFactory& customActionFactory) {
- #ifndef LWTRACE_DISABLE
- THolder<IExecutor> exec;
- IExecutor* last = nullptr;
- TArgumentList arguments;
- if (pred) {
- double sampleRate = pred->GetSampleRate();
- if (sampleRate != 0.0) {
- if (!(0.0 < sampleRate && sampleRate <= 1.0)) {
- ythrow yexception() << "probe '" << probe->Event.Name << "' block #" << bi + 1 << " sampling operator"
- << " invalid sample rate " << sampleRate << ", expected [0;1]";
- }
- exec.Reset(new TSamplingExecutor(sampleRate));
- last = exec.Get();
- }
- for (size_t i = 0; i < pred->OperatorsSize(); i++) {
- const TOperator& op = pred->GetOperators(i);
- TString exceptionPrefix;
- TStringOutput exceptionPrefixOutput(exceptionPrefix);
- exceptionPrefixOutput << "probe '" << probe->Event.Name << "' block #" << bi + 1 << " operator #" << i + 1;
- ParseArguments<TOperator>(op, probe->Event.Signature, exceptionPrefix, 2, arguments);
- THolder<IExecutor> opExec;
- TArgumentDescription arg0 = arguments.at(0);
- TArgumentDescription arg1 = arguments.at(1);
- const char* tName0 = arg0.ParamIdx == size_t(-1) ? nullptr : probe->Event.Signature.ParamTypes[arg0.ParamIdx];
- const char* tName1 = arg1.ParamIdx == size_t(-1) ? nullptr : probe->Event.Signature.ParamTypes[arg1.ParamIdx];
- TString var0 = op.GetArgument(0).GetVariable();
- TString var1 = op.GetArgument(1).GetVariable();
- TString val0 = op.GetArgument(0).GetValue();
- TString val1 = op.GetArgument(1).GetValue();
- #define FOREACH_OPERAND_TYPE_RT(n, t, v, fn, lt, rt) \
- if (rt == arg1.Type) { \
- TOperand<t, rt> rhs(traceVariables, var1, val1, arg1.ParamIdx); \
- opExec.Reset(new TOperatorExecutor<t, fn<t>, lt, rt>(lhs, rhs, invertCompare)); \
- break; \
- }
- #define FOREACH_OPERAND_TYPE_LT(n, t, v, fn, lt) \
- if (lt == arg0.Type) { \
- TOperand<t, lt> lhs(traceVariables, var0, val0, arg0.ParamIdx); \
- FOREACH_RIGHT_TYPE(FOREACH_OPERAND_TYPE_RT, n, t, v, fn, lt) \
- }
- #define FOREACH_PARAMTYPE_MACRO(n, t, v, fn) \
- if ((arg0.ParamIdx == size_t(-1) || strcmp(tName0, n) == 0) && (arg1.ParamIdx == size_t(-1) || strcmp(tName1, n) == 0)) { \
- FOREACH_LEFT_TYPE(FOREACH_OPERAND_TYPE_LT, n, t, v, fn); \
- }
- bool invertCompare = EqualToOneOf(op.GetType(), OT_NE, OT_GE, OT_LE);
- switch (op.GetType()) {
- case OT_EQ:
- case OT_NE:
- FOREACH_PARAMTYPE(FOREACH_PARAMTYPE_MACRO, std::equal_to);
- break;
- case OT_LT:
- case OT_GE:
- FOREACH_PARAMTYPE(FOREACH_PARAMTYPE_MACRO, std::less);
- break;
- case OT_GT:
- case OT_LE:
- FOREACH_PARAMTYPE(FOREACH_PARAMTYPE_MACRO, std::greater);
- break;
- default:
- ythrow yexception() << exceptionPrefix
- << " has not supported operator type #" << int(op.GetType());
- }
- #undef FOREACH_OPERAND_TYPE_RT
- #undef FOREACH_OPERAND_TYPE_LT
- #undef FOREACH_PARAMTYPE_MACRO
- if (!opExec) {
- ythrow yexception() << exceptionPrefix
- << " has not supported left param #" << arg0.ParamIdx + 1 << " type '"
- << (arg0.ParamIdx != size_t(-1) ? probe->Event.Signature.ParamTypes[arg0.ParamIdx] : "?")
- << "', or right param #" << arg0.ParamIdx + 1 << " type '"
- << (arg1.ParamIdx != size_t(-1) ? probe->Event.Signature.ParamTypes[arg1.ParamIdx] : "?")
- << "'";
- }
- if (!exec) {
- exec.Reset(opExec.Release());
- last = exec.Get();
- } else {
- last->SetNext(opExec.Release());
- last = last->GetNext();
- }
- }
- }
- for (int i = 0; i < actions.size(); ++i) {
- const TAction& action = actions.Get(i);
- THolder<IExecutor> actExec;
- if (action.HasPrintToStderrAction()) {
- actExec.Reset(new TStderrActionExecutor(probe));
- } else if (action.HasLogAction()) {
- if (Query.GetLogDurationUs()) {
- actExec.Reset(new TLogActionExecutor<TDurationLog>(probe, action.GetLogAction(), &DurationLog));
- } else {
- actExec.Reset(new TLogActionExecutor<TCyclicLog>(probe, action.GetLogAction(), &CyclicLog));
- }
- } else if (action.HasRunLogShuttleAction()) {
- if (Query.GetLogDurationUs()) {
- actExec.Reset(new TRunLogShuttleActionExecutor<TDurationDepot>(TraceIdx, action.GetRunLogShuttleAction(), &DurationDepot, &LastTrackId, &LastSpanId));
- } else {
- actExec.Reset(new TRunLogShuttleActionExecutor<TCyclicDepot>(TraceIdx, action.GetRunLogShuttleAction(), &CyclicDepot, &LastTrackId, &LastSpanId));
- }
- } else if (action.HasEditLogShuttleAction()) {
- if (Query.GetLogDurationUs()) {
- actExec.Reset(new TEditLogShuttleActionExecutor<TDurationDepot>(TraceIdx, action.GetEditLogShuttleAction()));
- } else {
- actExec.Reset(new TEditLogShuttleActionExecutor<TCyclicDepot>(TraceIdx, action.GetEditLogShuttleAction()));
- }
- } else if (action.HasDropLogShuttleAction()) {
- if (Query.GetLogDurationUs()) {
- actExec.Reset(new TDropLogShuttleActionExecutor<TDurationDepot>(TraceIdx, action.GetDropLogShuttleAction()));
- } else {
- actExec.Reset(new TDropLogShuttleActionExecutor<TCyclicDepot>(TraceIdx, action.GetDropLogShuttleAction()));
- }
- } else if (action.HasCustomAction()) {
- THolder<TCustomActionExecutor> customExec(customActionFactory.Create(probe, action.GetCustomAction(), this));
- if (customExec) {
- if (!customExec->IsDestructive() || destructiveActionsAllowed) {
- actExec.Reset(customExec.Release());
- } else {
- ythrow yexception() << "probe '" << probe->Event.Name << "block #" << bi + 1 << " action #" << i + 1
- << " contains destructive CustomAction, but destructive actions are disabled."
- << " Please, consider using --unsafe-lwtrace command line parameter.";
- }
- } else {
- ythrow yexception() << "probe '" << probe->Event.Name << "block #" << bi + 1 << " action #" << i + 1
- << " contains unregistered CustomAction '" << action.GetCustomAction().GetName() << "'";
- }
- } else if (action.HasKillAction()) {
- if (destructiveActionsAllowed) {
- actExec.Reset(new NPrivate::TKillActionExecutor(probe));
- } else {
- ythrow yexception() << "probe '" << probe->Event.Name << "block #" << bi + 1 << " action #" << i + 1
- << " contains destructive KillAction, but destructive actions are disabled."
- << " Please, consider using --unsafe-lwtrace command line parameter.";
- }
- } else if (action.HasSleepAction()) {
- if (destructiveActionsAllowed) {
- const TSleepAction& sleepAction = action.GetSleepAction();
- if (sleepAction.GetNanoSeconds()) {
- ui64 nanoSeconds = sleepAction.GetNanoSeconds();
- actExec.Reset(new NPrivate::TSleepActionExecutor(probe, nanoSeconds));
- } else {
- ythrow yexception() << "probe '" << probe->Event.Name << "block #" << bi + 1 << " action #" << i + 1
- << " SleepAction missing parameter 'NanoSeconds'";
- }
- } else {
- ythrow yexception() << "probe '" << probe->Event.Name << "block #" << bi + 1 << " action #" << i + 1
- << " contains destructive SleepAction, but destructive actions are disabled."
- << " Please, consider using --unsafe-lwtrace command line parameter.";
- }
- } else if (action.HasStatementAction()) {
- const TStatementAction& statement = action.GetStatementAction();
- TString exceptionPrefix;
- TStringOutput exceptionPrefixOutput(exceptionPrefix);
- exceptionPrefixOutput << "probe '" << probe->Event.Name << "' block #" << bi + 1 << " action #" << i + 1;
- size_t expectedArgumentCount = 3;
- if (statement.GetType() == ST_MOV || statement.GetType() == ST_ADD_EQ || statement.GetType() == ST_SUB_EQ) {
- expectedArgumentCount = 2;
- } else if (statement.GetType() == ST_INC || statement.GetType() == ST_DEC) {
- expectedArgumentCount = 1;
- }
- ParseArguments<TStatementAction>(statement, probe->Event.Signature, exceptionPrefix, expectedArgumentCount, arguments);
- TArgumentDescription arg0 = (expectedArgumentCount <= 0) ? TArgumentDescription() : arguments.at(0);
- TArgumentDescription arg1 = (expectedArgumentCount <= 1) ? TArgumentDescription() : arguments.at(1);
- TArgumentDescription arg2 = (expectedArgumentCount <= 2) ? TArgumentDescription() : arguments.at(2);
- TString var0 = (expectedArgumentCount <= 0) ? "" : statement.GetArgument(0).GetVariable();
- TString var1 = (expectedArgumentCount <= 1) ? "" : statement.GetArgument(1).GetVariable();
- TString var2 = (expectedArgumentCount <= 2) ? "" : statement.GetArgument(2).GetVariable();
- TString val0 = (expectedArgumentCount <= 0) ? "" : statement.GetArgument(0).GetValue();
- TString val1 = (expectedArgumentCount <= 1) ? "" : statement.GetArgument(1).GetValue();
- TString val2 = (expectedArgumentCount <= 2) ? "" : statement.GetArgument(2).GetValue();
- const char* tName1 = (expectedArgumentCount <= 1 || arg1.ParamIdx == size_t(-1))
- ? nullptr : probe->Event.Signature.ParamTypes[arg1.ParamIdx];
- const char* tName2 = (expectedArgumentCount <= 2 || arg2.ParamIdx == size_t(-1))
- ? nullptr : probe->Event.Signature.ParamTypes[arg2.ParamIdx];
- if (arg0.Type == OT_VARIABLE) {
- switch (statement.GetType()) {
- #define PARSE_UNARY_INPLACE_STATEMENT_MACRO(n, t, v, fn) \
- { \
- typedef TUnaryInplaceStatementExecutor<t, fn<TReceiver<t>>> TExec; \
- TReceiver<t> receiver(traceVariables, var0); \
- actExec.Reset(new TExec(receiver)); \
- break; \
- }
- #define PARSE_BINARY_INPLACE_STATEMENT_MACRO2(n, t, v, fn, ft) \
- if (arg1.Type == ft) { \
- typedef TBinaryInplaceStatementExecutor<t, fn<TReceiver<t>, t>, ft> TExec; \
- TOperand<t, ft> firstParam(traceVariables, var1, val1, arg1.ParamIdx); \
- actExec.Reset(new TExec(receiver, firstParam)); \
- break; \
- }
- #define PARSE_BINARY_INPLACE_STATEMENT_MACRO(n, t, v, fn) \
- if (arg1.ParamIdx == size_t(-1) || strcmp(tName1, n) == 0) { \
- TReceiver<t> receiver(traceVariables, var0); \
- FOREACH_RIGHT_TYPE(PARSE_BINARY_INPLACE_STATEMENT_MACRO2, n, t, v, fn); \
- }
- #define PARSE_BINARY_STATEMENT_MACRO2(n, t, v, fn, ft) \
- if (arg1.Type == ft) { \
- typedef TBinaryStatementExecutor<t, fn<t, t>, ft> TExec; \
- TOperand<t, ft> firstParam(traceVariables, var1, val1, arg1.ParamIdx); \
- actExec.Reset(new TExec(receiver, firstParam)); \
- break; \
- }
- #define PARSE_BINARY_STATEMENT_MACRO(n, t, v, fn) \
- if (arg1.ParamIdx == size_t(-1) || strcmp(tName1, n) == 0) { \
- TReceiver<t> receiver(traceVariables, var0); \
- FOREACH_RIGHT_TYPE(PARSE_BINARY_STATEMENT_MACRO2, n, t, v, fn); \
- }
- #define CREATE_OPERAND_GETTER_N(N, type, arg_type) \
- if (arg##N.Type == arg_type) { \
- operand##N.Reset(new TOperandGetter<type, arg_type>(TOperand<type, arg_type>(traceVariables, var##N, val##N, arg##N.ParamIdx))); \
- }
- #define TERNARY_ON_TYPE(n, t, v, fn) \
- if ((arg1.ParamIdx == size_t(-1) || strcmp(tName1, n) == 0) && (arg2.ParamIdx == size_t(-1) || strcmp(tName2, n) == 0)) { \
- TAutoPtr<IOperandGetter<t>> operand1, operand2; \
- FOREACH_LEFT_TYPE(CREATE_OPERAND_GETTER_N, 1, t); \
- FOREACH_RIGHT_TYPE(CREATE_OPERAND_GETTER_N, 2, t); \
- if (operand1 && operand2) { \
- actExec.Reset(new TTernaryStatementExecutor<t, fn<t>>( \
- TReceiver<t>(traceVariables, var0), \
- operand1, \
- operand2)); \
- } \
- break; \
- }
- #define IMPLEMENT_TERNARY_STATEMENT(fn) FOR_MATH_PARAMTYPE(TERNARY_ON_TYPE, fn)
- case ST_INC:
- FOR_MATH_PARAMTYPE(PARSE_UNARY_INPLACE_STATEMENT_MACRO, TInc);
- break;
- case ST_DEC:
- FOR_MATH_PARAMTYPE(PARSE_UNARY_INPLACE_STATEMENT_MACRO, TDec);
- break;
- case ST_MOV:
- FOR_MATH_PARAMTYPE(PARSE_BINARY_STATEMENT_MACRO, TTraceSecondArg);
- break;
- case ST_ADD_EQ:
- FOR_MATH_PARAMTYPE(PARSE_BINARY_INPLACE_STATEMENT_MACRO, TAddEq);
- break;
- case ST_SUB_EQ:
- FOR_MATH_PARAMTYPE(PARSE_BINARY_INPLACE_STATEMENT_MACRO, TSubEq);
- break;
- case ST_ADD:
- IMPLEMENT_TERNARY_STATEMENT(std::plus)
- break;
- case ST_SUB:
- IMPLEMENT_TERNARY_STATEMENT(std::minus)
- break;
- case ST_MUL:
- IMPLEMENT_TERNARY_STATEMENT(std::multiplies)
- break;
- case ST_DIV:
- IMPLEMENT_TERNARY_STATEMENT(std::divides)
- break;
- case ST_MOD:
- IMPLEMENT_TERNARY_STATEMENT(std::modulus)
- break;
- default:
- ythrow yexception() << "block #" << bi + 1 << " action #" << i + 1
- << " has not supported statement type #" << int(statement.GetType());
- }
- }
- if (!actExec) {
- ythrow yexception() << "block #" << bi + 1 << " action #" << i + 1
- << " can't create action";
- }
- #undef CREATE_OPERAND_GETTER_N
- #undef TERNARY_ON_TYPE
- #undef IMPLEMENT_TERNARY_STATEMENT
- #undef PARSE_TERNARY_STATEMENT_MACRO
- #undef PARSE_BINARY_STATEMENT_MACRO
- #undef PARSE_BINARY_INPLACE_STATEMENT_MACRO
- #undef PARSE_UNARY_INPLACE_STATEMENT_MACRO
- } else {
- ythrow yexception() << "block #" << bi + 1 << " action #" << i + 1
- << " has not supported action '" << action.ShortDebugString() << "'";
- }
- if (!exec) {
- exec.Reset(actExec.Release());
- last = exec.Get();
- } else {
- last->SetNext(actExec.Release());
- last = last->GetNext();
- }
- }
- if (!probe->Attach(exec.Get())) {
- ythrow yexception() << "block #" << bi + 1
- << " cannot be attached to probe '" << probe->Event.Name << "': no free slots";
- }
- Probes.push_back(std::make_pair(probe, exec.Release()));
- #else
- Y_UNUSED(bi);
- Y_UNUSED(pred);
- Y_UNUSED(actions);
- Y_UNUSED(probe);
- Y_UNUSED(destructiveActionsAllowed);
- Y_UNUSED(traceVariables);
- Y_UNUSED(customActionFactory);
- #endif
- }
- TSession::TSession(ui64 traceIdx,
- TProbeRegistry& registry,
- const TQuery& query,
- const bool destructiveActionsAllowed,
- const TCustomActionFactory& customActionFactory)
- : StartTime(TInstant::Now())
- , TraceIdx(traceIdx)
- , Registry(registry)
- , StoreDuration(TDuration::MicroSeconds(query.GetLogDurationUs() * 11 / 10)) // +10% to try avoid truncation while reading multiple threads/traces
- , ReadDuration(TDuration::MicroSeconds(query.GetLogDurationUs()))
- , CyclicLog(query.GetPerThreadLogSize() ? query.GetPerThreadLogSize() : 1000)
- , DurationLog(StoreDuration)
- , CyclicDepot(query.GetPerThreadLogSize() ? query.GetPerThreadLogSize() : 1000)
- , DurationDepot(StoreDuration)
- , LastTrackId(0)
- , LastSpanId(0)
- , Attached(true)
- , Query(query)
- {
- try {
- for (size_t bi = 0; bi < query.BlocksSize(); bi++) {
- const TBlock& block = query.GetBlocks(bi);
- if (!block.HasProbeDesc()) {
- ythrow yexception() << "block #" << bi + 1 << " has no probe description";
- }
- const TProbeDesc& pdesc = block.GetProbeDesc();
- const TPredicate* pred = block.HasPredicate() ? &block.GetPredicate() : nullptr;
- if (block.ActionSize() < 1) {
- ythrow yexception() << "block #" << bi + 1 << " has no action";
- }
- const NProtoBuf::RepeatedPtrField<TAction>& actions = block.action();
- if (pdesc.GetName() && pdesc.GetProvider()) {
- TProbeRegistry::TProbesAccessor probes(Registry);
- bool found = false;
- for (auto& kv : probes) {
- TProbe* probe = kv.first;
- if (probe->Event.Name == pdesc.GetName() && probe->Event.GetProvider() == pdesc.GetProvider()) {
- InsertExecutor(TraceVariables, bi, pred, actions, probe, destructiveActionsAllowed, customActionFactory);
- found = true;
- break;
- }
- }
- if (!found) {
- ythrow yexception() << "block #" << bi + 1 << " has no matching probe with name '"
- << pdesc.GetName() << "' provider '" << pdesc.GetProvider() << "'";
- }
- } else if (pdesc.GetGroup()) {
- bool found = false;
- TProbeRegistry::TProbesAccessor probes(Registry);
- for (auto& kv : probes) {
- TProbe* probe = kv.first;
- for (const char* const* gi = probe->Event.Groups; *gi != nullptr; gi++) {
- if (*gi == pdesc.GetGroup()) {
- InsertExecutor(TraceVariables, bi, pred, actions, probe, destructiveActionsAllowed, customActionFactory);
- found = true;
- break;
- }
- }
- }
- if (!found) {
- ythrow yexception() << "block #" << bi + 1
- << " has no matching probes for group '" << pdesc.GetGroup() << "'";
- }
- } else {
- ythrow yexception() << "block #" << bi + 1 << " has bad probe description: name '" << pdesc.GetName()
- << "' provider '" << pdesc.GetProvider()
- << "' group '" << pdesc.GetGroup() << "'";
- }
- }
- } catch (...) {
- Destroy();
- throw;
- }
- }
- void TSession::Destroy() {
- Detach();
- for (auto& probe : Probes) {
- delete probe.second;
- }
- }
- TSession::~TSession() {
- Destroy();
- }
- void TSession::Detach() {
- if (Attached) {
- for (auto& p : Probes) {
- TProbe* probe = p.first;
- IExecutor* exec = p.second;
- probe->Detach(exec);
- }
- Attached = false;
- }
- }
- size_t TSession::GetEventsCount() const {
- return CyclicLog.GetEventsCount() + DurationLog.GetEventsCount() + CyclicDepot.GetEventsCount() + DurationDepot.GetEventsCount();
- }
- size_t TSession::GetThreadsCount() const {
- return CyclicLog.GetThreadsCount() + DurationLog.GetThreadsCount() + CyclicDepot.GetThreadsCount() + DurationDepot.GetThreadsCount();
- }
- class TReadToProtobuf {
- private:
- TMap<TThread::TId, TVector<TLogItem>> Items;
- public:
- void ToProtobuf(TLogPb& pb) const {
- TSet<TProbe*> probes;
- ui64 eventsCount = 0;
- for (auto kv : Items) {
- TThreadLogPb* tpb = pb.AddThreadLogs();
- tpb->SetThreadId(kv.first);
- for (TLogItem& item : kv.second) {
- item.ToProtobuf(*tpb->AddLogItems());
- probes.insert(item.Probe);
- eventsCount++;
- }
- }
- pb.SetEventsCount(eventsCount);
- for (TProbe* probe : probes) {
- probe->Event.ToProtobuf(*pb.AddEvents());
- }
- }
- void Push(TThread::TId tid, const TLogItem& item) {
- // Avoid any expansive operations in Push(), because it executes under lock and blocks program being traced
- Items[tid].push_back(item);
- }
- };
- void TSession::ToProtobuf(TLogPb& pb) const {
- TReadToProtobuf reader;
- ReadItems(reader);
- reader.ToProtobuf(pb);
- pb.MutableQuery()->CopyFrom(Query);
- pb.SetCrtTime(TInstant::Now().GetValue());
- }
- TManager::TManager(TProbeRegistry& registry, bool allowDestructiveActions)
- : Registry(registry)
- , DestructiveActionsAllowed(allowDestructiveActions)
- , SerializingExecutor(new TRunLogShuttleActionExecutor<TCyclicDepot>(0, {}, nullptr, nullptr, nullptr))
- {
- }
- TManager::~TManager() {
- for (auto& trace : Traces) {
- delete trace.second;
- }
- }
- bool TManager::HasTrace(const TString& id) const {
- TGuard<TMutex> g(Mtx);
- return Traces.contains(id);
- }
- const TSession* TManager::GetTrace(const TString& id) const {
- TGuard<TMutex> g(Mtx);
- TTraces::const_iterator it = Traces.find(id);
- if (it == Traces.end()) {
- ythrow yexception() << "trace id '" << id << "' is not used";
- } else {
- return it->second;
- }
- }
- void TManager::New(const TString& id, const TQuery& query) {
- TGuard<TMutex> g(Mtx);
- if (Traces.find(id) == Traces.end()) {
- TSession* trace = new TSession(++LastTraceIdx, Registry, query, GetDestructiveActionsAllowed(), CustomActionFactory);
- Traces[id] = trace;
- } else {
- ythrow yexception() << "trace id '" << id << "' is already used";
- }
- }
- void TManager::Delete(const TString& id) {
- TGuard<TMutex> g(Mtx);
- TTraces::iterator it = Traces.find(id);
- if (it == Traces.end()) {
- ythrow yexception() << "trace id '" << id << "' is not used";
- } else {
- delete it->second;
- Traces.erase(it);
- }
- }
- void TManager::Stop(const TString& id) {
- TGuard<TMutex> g(Mtx);
- TTraces::iterator it = Traces.find(id);
- if (it == Traces.end()) {
- ythrow yexception() << "trace id '" << id << "' is not used";
- } else {
- it->second->Detach();
- }
- }
- }
|