log.h 27 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929
  1. #pragma once
  2. #include "probe.h"
  3. #include <util/datetime/base.h>
  4. #include <util/generic/algorithm.h>
  5. #include <util/generic/deque.h>
  6. #include <util/generic/noncopyable.h>
  7. #include <util/generic/vector.h>
  8. #include <util/string/printf.h>
  9. #include <library/cpp/deprecated/atomic/atomic.h>
  10. #include <util/system/hp_timer.h>
  11. #include <util/system/mutex.h>
  12. #include <util/system/spinlock.h>
  13. #include <util/system/thread.h>
  14. #include <util/system/tls.h>
  15. namespace NLWTrace {
  16. // Cyclic buffer that pushes items to its back and pop item from front on overflow
  17. template <class TItem>
  18. class TCyclicBuffer: public TNonCopyable {
  19. private:
  20. TVector<TItem> Data;
  21. TItem* Front; // Points to the first item (valid iff Size > 0)
  22. TItem* Back; // Points to the last item (valid iff Size > 0)
  23. size_t Size; // Number of items in the buffer
  24. TItem* First() {
  25. return &*Data.begin();
  26. }
  27. TItem* Last() {
  28. return &*Data.end();
  29. }
  30. const TItem* First() const {
  31. return &*Data.begin();
  32. }
  33. const TItem* Last() const {
  34. return &*Data.end();
  35. }
  36. public:
  37. explicit TCyclicBuffer(size_t capacity)
  38. : Data(capacity)
  39. , Size(0)
  40. {
  41. }
  42. TItem* Add() {
  43. if (Size != 0) {
  44. Inc(Back);
  45. if (Back == Front) {
  46. Inc(Front); // Forget (pop_front) old items
  47. } else {
  48. Size++;
  49. }
  50. } else {
  51. Front = Back = First();
  52. Size = 1;
  53. }
  54. Back->Clear();
  55. return Back;
  56. }
  57. TItem* GetFront() {
  58. return Front;
  59. }
  60. TItem* GetBack() {
  61. return Back;
  62. }
  63. const TItem* GetFront() const {
  64. return Front;
  65. }
  66. const TItem* GetBack() const {
  67. return Back;
  68. }
  69. size_t GetSize() const {
  70. return Size;
  71. }
  72. bool IsFull() const {
  73. return Size == Data.size();
  74. }
  75. void Inc(TItem*& it) {
  76. it++;
  77. if (it == Last()) {
  78. it = First();
  79. }
  80. }
  81. void Inc(const TItem*& it) const {
  82. it++;
  83. if (it == Last()) {
  84. it = First();
  85. }
  86. }
  87. void Destroy() {
  88. Data.clear();
  89. Size = 0;
  90. }
  91. void Clear() {
  92. Size = 0;
  93. }
  94. void Swap(TCyclicBuffer& other) {
  95. Data.swap(other.Data);
  96. std::swap(Front, other.Front);
  97. std::swap(Back, other.Back);
  98. std::swap(Size, other.Size);
  99. }
  100. };
  101. // Buffer that pushes items to its back and pop item from front on expire
  102. template <class TItem>
  103. class TDurationBuffer: public TNonCopyable {
  104. protected:
  105. TDeque<TItem> Data;
  106. ui64 StoreDuration;
  107. ui8 CleanupCounter = 0;
  108. public:
  109. explicit TDurationBuffer(TDuration duration)
  110. : StoreDuration(DurationToCycles(duration))
  111. {
  112. }
  113. TItem* Add() {
  114. if (!CleanupCounter) {
  115. Cleanup();
  116. CleanupCounter = 128; // Make cleanup after every 128 additions
  117. }
  118. CleanupCounter--;
  119. Data.emplace_back();
  120. return &Data.back();
  121. }
  122. TItem* GetFront() {
  123. return &Data.front();
  124. }
  125. TItem* GetBack() {
  126. return &Data.back();
  127. }
  128. const TItem* GetFront() const {
  129. return &Data.front();
  130. }
  131. const TItem* GetBack() const {
  132. return &Data.back();
  133. }
  134. size_t GetSize() const {
  135. return Data.size();
  136. }
  137. bool Empty() const {
  138. return Data.empty();
  139. }
  140. void Destroy() {
  141. Data.clear();
  142. }
  143. void Swap(TDurationBuffer& other) {
  144. Data.swap(other.Data);
  145. std::swap(StoreDuration, other.StoreDuration);
  146. }
  147. private:
  148. void Cleanup() {
  149. ui64 cutoff = GetCycleCount();
  150. if (cutoff > StoreDuration) {
  151. cutoff -= StoreDuration;
  152. while (!Data.empty() && Data.front().GetTimestampCycles() < cutoff) {
  153. Data.pop_front();
  154. }
  155. }
  156. }
  157. };
  158. struct TLogItem {
  159. TProbe* Probe = nullptr;
  160. TParams Params;
  161. size_t SavedParamsCount;
  162. TInstant Timestamp;
  163. ui64 TimestampCycles;
  164. TLogItem() {
  165. }
  166. TLogItem(const TLogItem& other)
  167. : Probe(other.Probe)
  168. , SavedParamsCount(other.SavedParamsCount)
  169. , Timestamp(other.Timestamp)
  170. , TimestampCycles(other.TimestampCycles)
  171. {
  172. Clone(other);
  173. }
  174. ~TLogItem() {
  175. Destroy();
  176. }
  177. TLogItem& operator=(const TLogItem& other) {
  178. Destroy();
  179. Probe = other.Probe;
  180. SavedParamsCount = other.SavedParamsCount;
  181. Timestamp = other.Timestamp;
  182. TimestampCycles = other.TimestampCycles;
  183. Clone(other);
  184. return *this;
  185. }
  186. void Clear() {
  187. Destroy();
  188. Probe = nullptr;
  189. }
  190. void ToProtobuf(TLogItemPb& pb) const {
  191. pb.SetName(Probe->Event.Name);
  192. pb.SetProvider(Probe->Event.GetProvider());
  193. if (SavedParamsCount > 0) {
  194. TString paramValues[LWTRACE_MAX_PARAMS];
  195. Probe->Event.Signature.SerializeParams(Params, paramValues);
  196. for (size_t pi = 0; pi < SavedParamsCount; pi++) {
  197. pb.AddParams(paramValues[pi]);
  198. }
  199. }
  200. pb.SetTimestamp(Timestamp.GetValue());
  201. pb.SetTimestampCycles(TimestampCycles);
  202. }
  203. TTypedParam GetParam(const TString& param) const {
  204. if (SavedParamsCount == 0) {
  205. return TTypedParam();
  206. } else {
  207. size_t idx = Probe->Event.Signature.FindParamIndex(param);
  208. if (idx >= SavedParamsCount) { // Also covers idx=-1 case (not found)
  209. return TTypedParam();
  210. } else {
  211. EParamTypePb type = ParamTypeToProtobuf(Probe->Event.Signature.ParamTypes[idx]);
  212. return TTypedParam(type, Params.Param[idx]);
  213. }
  214. }
  215. }
  216. ui64 GetTimestampCycles() const {
  217. return TimestampCycles;
  218. }
  219. private:
  220. void Clone(const TLogItem& other) {
  221. if (Probe && SavedParamsCount > 0) {
  222. Probe->Event.Signature.CloneParams(Params, other.Params);
  223. }
  224. }
  225. void Destroy() {
  226. if (Probe && SavedParamsCount > 0) {
  227. Probe->Event.Signature.DestroyParams(Params);
  228. }
  229. }
  230. };
  231. struct TTrackLog {
  232. struct TItem : TLogItem {
  233. TThread::TId ThreadId;
  234. TItem() = default;
  235. TItem(TThread::TId tid, const TLogItem& item)
  236. : TLogItem(item)
  237. , ThreadId(tid)
  238. {
  239. }
  240. };
  241. using TItems = TVector<TItem>;
  242. TItems Items;
  243. bool Truncated = false;
  244. ui64 Id = 0;
  245. void Clear() {
  246. Items.clear();
  247. Truncated = false;
  248. }
  249. ui64 GetTimestampCycles() const {
  250. return Items.empty() ? 0 : Items.front().GetTimestampCycles();
  251. }
  252. };
  253. // Log that uses per-thread cyclic buffers to store items
  254. template <class T>
  255. class TCyclicLogImpl: public TNonCopyable {
  256. public:
  257. using TLog = TCyclicLogImpl;
  258. using TItem = T;
  259. private:
  260. using TBuffer = TCyclicBuffer<TItem>;
  261. class TStorage {
  262. private:
  263. // Data that can be accessed in lock-free way from reader/writer
  264. TAtomic Writers = 0;
  265. mutable TBuffer* volatile CurBuffer = nullptr;
  266. // Data that can be accessed only from reader
  267. // NOTE: multiple readers are serialized by TCyclicLogImpl::Lock
  268. mutable TBuffer* OldBuffer = nullptr;
  269. mutable TBuffer* NewBuffer = nullptr;
  270. TLog* volatile Log = nullptr;
  271. TThread::TId ThreadId = 0;
  272. TAtomic EventsCount = 0;
  273. public:
  274. TStorage() {
  275. }
  276. explicit TStorage(TLog* log)
  277. : CurBuffer(new TBuffer(log->GetCapacity()))
  278. , OldBuffer(new TBuffer(log->GetCapacity()))
  279. , NewBuffer(new TBuffer(log->GetCapacity()))
  280. , Log(log)
  281. , ThreadId(TThread::CurrentThreadId())
  282. {
  283. Log->RegisterThread(this);
  284. }
  285. ~TStorage() {
  286. if (TLog* log = AtomicSwap(&Log, nullptr)) {
  287. AtomicBarrier(); // Serialize `Log' and TCyclicLogImpl::Lock memory order
  288. // NOTE: the following function swaps `this' with `new TStorage()'
  289. log->UnregisterThreadAndMakeOrphan(this);
  290. } else {
  291. // NOTE: `Log' can be nullptr if either it is orphan storage or TryDismiss() succeeded
  292. // NOTE: in both cases it is ok to call these deletes
  293. delete CurBuffer;
  294. delete OldBuffer;
  295. delete NewBuffer;
  296. }
  297. }
  298. bool TryDismiss() {
  299. // TCyclicLogImpl::Lock implied (no readers)
  300. if (TLog* log = AtomicSwap(&Log, nullptr)) {
  301. TBuffer* curBuffer = AtomicSwap(&CurBuffer, nullptr);
  302. WaitForWriters();
  303. // At this point we guarantee that there is no and wont be active writer
  304. delete curBuffer;
  305. delete OldBuffer;
  306. delete NewBuffer;
  307. OldBuffer = nullptr;
  308. NewBuffer = nullptr;
  309. return true;
  310. } else {
  311. // ~TStorage() is in progress
  312. return false;
  313. }
  314. }
  315. void WaitForWriters() const {
  316. while (AtomicGet(Writers) > 0) {
  317. SpinLockPause();
  318. }
  319. }
  320. TThread::TId GetThreadId() const {
  321. // TCyclicLogImpl::Lock implied (no readers)
  322. return ThreadId;
  323. }
  324. size_t GetEventsCount() const {
  325. // TCyclicLogImpl::Lock implied (no readers)
  326. return AtomicGet(EventsCount);
  327. }
  328. void Swap(TStorage& other) {
  329. // TCyclicLogImpl::Lock implied (no readers)
  330. std::swap(CurBuffer, other.CurBuffer);
  331. std::swap(OldBuffer, other.OldBuffer);
  332. std::swap(NewBuffer, other.NewBuffer);
  333. std::swap(Log, other.Log);
  334. std::swap(ThreadId, other.ThreadId);
  335. std::swap(EventsCount, other.EventsCount);
  336. }
  337. TBuffer* StartWriter() {
  338. AtomicIncrement(Writers);
  339. return const_cast<TBuffer*>(AtomicGet(CurBuffer));
  340. }
  341. void StopWriter() {
  342. AtomicDecrement(Writers);
  343. }
  344. void IncEventsCount() {
  345. AtomicIncrement(EventsCount);
  346. }
  347. template <class TReader>
  348. void ReadItems(TReader& r) const {
  349. // TCyclicLogImpl::Lock implied
  350. NewBuffer = AtomicSwap(&CurBuffer, NewBuffer);
  351. WaitForWriters();
  352. // Merge new buffer into old buffer
  353. if (NewBuffer->IsFull()) {
  354. std::swap(NewBuffer, OldBuffer);
  355. } else {
  356. if (NewBuffer->GetSize() > 0) {
  357. for (const TItem *i = NewBuffer->GetFront(), *e = NewBuffer->GetBack();; NewBuffer->Inc(i)) {
  358. TItem* oldSlot = OldBuffer->Add();
  359. *oldSlot = *i;
  360. if (i == e) {
  361. break;
  362. }
  363. }
  364. }
  365. }
  366. NewBuffer->Clear();
  367. // Iterate over old buffer
  368. if (OldBuffer->GetSize() > 0) {
  369. for (const TItem *i = OldBuffer->GetFront(), *e = OldBuffer->GetBack();; OldBuffer->Inc(i)) {
  370. r.Push(ThreadId, *i);
  371. if (i == e) {
  372. break;
  373. }
  374. }
  375. }
  376. }
  377. template <class TReader>
  378. void ExtractItems(TReader& r) {
  379. ReadItems(r);
  380. for (TItem *i = OldBuffer->GetFront(), *e = OldBuffer->GetBack();; OldBuffer->Inc(i)) {
  381. i->Clear();
  382. if (i == e) {
  383. break;
  384. }
  385. }
  386. OldBuffer->Clear();
  387. }
  388. };
  389. size_t Capacity;
  390. Y_THREAD(TStorage)
  391. PerThreadStorage;
  392. TSpinLock Lock;
  393. // If thread exits its storage is destroyed, so we move it into OrphanStorages before destruction
  394. TVector<TAtomicSharedPtr<TStorage>> OrphanStorages;
  395. typedef TVector<TStorage*> TStoragesVec;
  396. TStoragesVec StoragesVec;
  397. TAtomic ThreadsCount;
  398. public:
  399. explicit TCyclicLogImpl(size_t capacity)
  400. : Capacity(capacity)
  401. , PerThreadStorage(this)
  402. , ThreadsCount(0)
  403. {
  404. }
  405. ~TCyclicLogImpl() {
  406. for (bool again = true; again;) {
  407. TGuard<TSpinLock> g(Lock);
  408. AtomicBarrier(); // Serialize `storage->Log' and Lock memory order
  409. again = false;
  410. while (!StoragesVec.empty()) {
  411. TStorage* storage = StoragesVec.back();
  412. // TStorage destructor can be called when TCyclicLogImpl is already destructed
  413. // So we ensure this does not lead to problems
  414. // NOTE: Y_THREAD(TStorage) destructs TStorage object for a specific thread only on that thread exit
  415. // NOTE: this issue can lead to memleaks if threads never exit and many TCyclicLogImpl are created
  416. if (storage->TryDismiss()) {
  417. StoragesVec.pop_back();
  418. } else {
  419. // Rare case when another thread is running ~TStorage() -- let it finish
  420. again = true;
  421. SpinLockPause();
  422. break;
  423. }
  424. }
  425. }
  426. }
  427. size_t GetCapacity() const {
  428. return Capacity;
  429. }
  430. size_t GetEventsCount() const {
  431. size_t events = 0;
  432. TGuard<TSpinLock> g(Lock);
  433. for (auto i : StoragesVec) {
  434. events += i->GetEventsCount();
  435. }
  436. for (const auto& orphanStorage : OrphanStorages) {
  437. events += orphanStorage->GetEventsCount();
  438. }
  439. return events;
  440. }
  441. size_t GetThreadsCount() const {
  442. return AtomicGet(ThreadsCount);
  443. }
  444. void RegisterThread(TStorage* storage) {
  445. TGuard<TSpinLock> g(Lock);
  446. StoragesVec.push_back(storage);
  447. AtomicIncrement(ThreadsCount);
  448. }
  449. void UnregisterThreadAndMakeOrphan(TStorage* storage) {
  450. TGuard<TSpinLock> g(Lock);
  451. // `storage' writers are not possible at this scope because
  452. // UnregisterThreadAndMakeOrphan is only called from exiting threads.
  453. // `storage' readers are not possible at this scope due to Lock guard.
  454. Erase(StoragesVec, storage);
  455. TAtomicSharedPtr<TStorage> orphan(new TStorage());
  456. orphan->Swap(*storage); // Swap is required because we cannot take ownership from Y_THREAD(TStorage) object
  457. OrphanStorages.push_back(orphan);
  458. }
  459. template <class TReader>
  460. void ReadThreads(TReader& r) const {
  461. TGuard<TSpinLock> g(Lock);
  462. for (auto i : StoragesVec) {
  463. r.PushThread(i->GetThreadId());
  464. }
  465. for (const auto& orphanStorage : OrphanStorages) {
  466. r.PushThread(orphanStorage->GetThreadId());
  467. }
  468. }
  469. template <class TReader>
  470. void ReadItems(TReader& r) const {
  471. TGuard<TSpinLock> g(Lock);
  472. for (auto i : StoragesVec) {
  473. i->ReadItems(r);
  474. }
  475. for (const auto& orphanStorage : OrphanStorages) {
  476. orphanStorage->ReadItems(r);
  477. }
  478. }
  479. template <class TReader>
  480. void ExtractItems(TReader& r) const {
  481. TGuard<TSpinLock> g(Lock);
  482. for (auto i: StoragesVec) {
  483. i->ExtractItems(r);
  484. }
  485. for (const auto& orphanStorage: OrphanStorages) {
  486. orphanStorage->ExtractItems(r);
  487. }
  488. }
  489. class TAccessor {
  490. private:
  491. TStorage& Storage;
  492. TBuffer* Buffer;
  493. public:
  494. explicit TAccessor(TLog& log)
  495. : Storage(log.PerThreadStorage.Get())
  496. , Buffer(Storage.StartWriter())
  497. {
  498. }
  499. ~TAccessor() {
  500. Storage.StopWriter();
  501. }
  502. TItem* Add() {
  503. if (Buffer) {
  504. Storage.IncEventsCount();
  505. return Buffer->Add();
  506. } else {
  507. // TStorage detached from trace due to trace destruction
  508. // so we should not try log anything
  509. return nullptr;
  510. }
  511. }
  512. };
  513. friend class TAccessor;
  514. };
  515. using TCyclicLog = TCyclicLogImpl<TLogItem>;
  516. using TCyclicDepot = TCyclicLogImpl<TTrackLog>;
  517. // Log that uses per-thread buffers to store items up to given duration
  518. template <class T>
  519. class TDurationLogImpl: public TNonCopyable {
  520. public:
  521. using TLog = TDurationLogImpl;
  522. using TItem = T;
  523. class TAccessor;
  524. friend class TAccessor;
  525. class TAccessor: public TGuard<TSpinLock> {
  526. private:
  527. TLog& Log;
  528. public:
  529. explicit TAccessor(TLog& log)
  530. : TGuard<TSpinLock>(log.PerThreadStorage.Get().Lock)
  531. , Log(log)
  532. {
  533. }
  534. TItem* Add() {
  535. return Log.PerThreadStorage.Get().Add();
  536. }
  537. };
  538. private:
  539. class TStorage: public TDurationBuffer<TItem> {
  540. private:
  541. TLog* Log;
  542. TThread::TId ThreadId;
  543. ui64 EventsCount;
  544. public:
  545. TSpinLock Lock;
  546. TStorage()
  547. : TDurationBuffer<TItem>(TDuration::Zero())
  548. , Log(nullptr)
  549. , ThreadId(0)
  550. , EventsCount(0)
  551. {
  552. }
  553. explicit TStorage(TLog* log)
  554. : TDurationBuffer<TItem>(log->GetDuration())
  555. , Log(log)
  556. , ThreadId(TThread::CurrentThreadId())
  557. , EventsCount(0)
  558. {
  559. Log->RegisterThread(this);
  560. }
  561. ~TStorage() {
  562. if (Log) {
  563. Log->UnregisterThread(this);
  564. }
  565. }
  566. void DetachFromTraceLog() {
  567. Log = nullptr;
  568. }
  569. TItem* Add() {
  570. EventsCount++;
  571. return TDurationBuffer<TItem>::Add();
  572. }
  573. bool Expired(ui64 now) const {
  574. return this->Empty() ? true : this->GetBack()->GetTimestampCycles() + this->StoreDuration < now;
  575. }
  576. TThread::TId GetThreadId() const {
  577. return ThreadId;
  578. }
  579. size_t GetEventsCount() const {
  580. return EventsCount;
  581. }
  582. void Swap(TStorage& other) {
  583. TDurationBuffer<TItem>::Swap(other);
  584. std::swap(Log, other.Log);
  585. std::swap(ThreadId, other.ThreadId);
  586. std::swap(EventsCount, other.EventsCount);
  587. }
  588. template <class TReader>
  589. void ReadItems(ui64 now, ui64 duration, TReader& r) const {
  590. TGuard<TSpinLock> g(Lock);
  591. if (now > duration) {
  592. ui64 cutoff = now - duration;
  593. for (const TItem& item : this->Data) {
  594. if (item.GetTimestampCycles() >= cutoff) {
  595. r.Push(ThreadId, item);
  596. }
  597. }
  598. } else {
  599. for (const TItem& item : this->Data) {
  600. r.Push(ThreadId, item);
  601. }
  602. }
  603. }
  604. };
  605. TDuration Duration;
  606. Y_THREAD(TStorage)
  607. PerThreadStorage;
  608. TSpinLock Lock;
  609. typedef TVector<TAtomicSharedPtr<TStorage>> TOrphanStorages;
  610. TOrphanStorages OrphanStorages; // if thread exits its storage is destroyed, so we move it into OrphanStorages before destruction
  611. TAtomic OrphanStoragesEventsCount = 0;
  612. typedef TVector<TStorage*> TStoragesVec;
  613. TStoragesVec StoragesVec;
  614. TAtomic ThreadsCount;
  615. public:
  616. explicit TDurationLogImpl(TDuration duration)
  617. : Duration(duration)
  618. , PerThreadStorage(this)
  619. , ThreadsCount(0)
  620. {
  621. }
  622. ~TDurationLogImpl() {
  623. for (auto storage : StoragesVec) {
  624. // NOTE: Y_THREAD(TStorage) destructs TStorage object for a specific thread only on that thread exit
  625. // NOTE: this issue can lead to memleaks if threads never exit and many TTraceLogs are created
  626. storage->Destroy();
  627. // TraceLogStorage destructor can be called when TTraceLog is already destructed
  628. // So we ensure this does not lead to problems
  629. storage->DetachFromTraceLog();
  630. }
  631. }
  632. TDuration GetDuration() const {
  633. return Duration;
  634. }
  635. size_t GetEventsCount() const {
  636. size_t events = AtomicGet(OrphanStoragesEventsCount);
  637. TGuard<TSpinLock> g(Lock);
  638. for (auto i : StoragesVec) {
  639. events += i->GetEventsCount();
  640. }
  641. return events;
  642. }
  643. size_t GetThreadsCount() const {
  644. return AtomicGet(ThreadsCount);
  645. }
  646. void RegisterThread(TStorage* storage) {
  647. TGuard<TSpinLock> g(Lock);
  648. StoragesVec.push_back(storage);
  649. AtomicIncrement(ThreadsCount);
  650. }
  651. void UnregisterThread(TStorage* storage) {
  652. TGuard<TSpinLock> g(Lock);
  653. for (auto i = StoragesVec.begin(), e = StoragesVec.end(); i != e; ++i) {
  654. if (*i == storage) {
  655. StoragesVec.erase(i);
  656. break;
  657. }
  658. }
  659. TAtomicSharedPtr<TStorage> orphan(new TStorage());
  660. orphan->Swap(*storage);
  661. orphan->DetachFromTraceLog();
  662. AtomicAdd(OrphanStoragesEventsCount, orphan->GetEventsCount());
  663. OrphanStorages.push_back(orphan);
  664. CleanOrphanStorages(GetCycleCount());
  665. }
  666. void CleanOrphanStorages(ui64 now) {
  667. EraseIf(OrphanStorages, [=](const TAtomicSharedPtr<TStorage>& ptr) {
  668. const TStorage& storage = *ptr;
  669. return storage.Expired(now);
  670. });
  671. }
  672. template <class TReader>
  673. void ReadThreads(TReader& r) const {
  674. TGuard<TSpinLock> g(Lock);
  675. for (TStorage* i : StoragesVec) {
  676. r.PushThread(i->GetThreadId());
  677. }
  678. for (const auto& orphanStorage : OrphanStorages) {
  679. r.PushThread(orphanStorage->GetThreadId());
  680. }
  681. }
  682. template <class TReader>
  683. void ReadItems(ui64 now, ui64 duration, TReader& r) const {
  684. TGuard<TSpinLock> g(Lock);
  685. for (TStorage* storage : StoragesVec) {
  686. storage->ReadItems(now, duration, r);
  687. }
  688. for (const auto& orphanStorage : OrphanStorages) {
  689. orphanStorage->ReadItems(now, duration, r);
  690. }
  691. }
  692. };
  693. using TDurationLog = TDurationLogImpl<TLogItem>;
  694. using TDurationDepot = TDurationLogImpl<TTrackLog>;
  695. // Log that uses one cyclic buffer to store items
  696. // Each item is a result of execution of some event
  697. class TInMemoryLog: public TNonCopyable {
  698. public:
  699. struct TItem {
  700. const TEvent* Event;
  701. TParams Params;
  702. TInstant Timestamp;
  703. TItem()
  704. : Event(nullptr)
  705. {
  706. }
  707. TItem(const TItem& other)
  708. : Event(other.Event)
  709. , Timestamp(other.Timestamp)
  710. {
  711. Clone(other);
  712. }
  713. ~TItem() {
  714. Destroy();
  715. }
  716. TItem& operator=(const TItem& other) {
  717. Destroy();
  718. Event = other.Event;
  719. Timestamp = other.Timestamp;
  720. Clone(other);
  721. return *this;
  722. }
  723. void Clear() {
  724. Destroy();
  725. Event = nullptr;
  726. }
  727. private:
  728. void Clone(const TItem& other) {
  729. if (Event && Event->Signature.ParamCount > 0) {
  730. Event->Signature.CloneParams(Params, other.Params);
  731. }
  732. }
  733. void Destroy() {
  734. if (Event && Event->Signature.ParamCount > 0) {
  735. Event->Signature.DestroyParams(Params);
  736. }
  737. }
  738. };
  739. class TAccessor;
  740. friend class TAccessor;
  741. class TAccessor: public TGuard<TMutex> {
  742. private:
  743. TInMemoryLog& Log;
  744. public:
  745. explicit TAccessor(TInMemoryLog& log)
  746. : TGuard<TMutex>(log.Lock)
  747. , Log(log)
  748. {
  749. }
  750. TItem* Add() {
  751. return Log.Storage.Add();
  752. }
  753. };
  754. private:
  755. TMutex Lock;
  756. TCyclicBuffer<TItem> Storage;
  757. public:
  758. explicit TInMemoryLog(size_t capacity)
  759. : Storage(capacity)
  760. {
  761. }
  762. template <class TReader>
  763. void ReadItems(TReader& r) const {
  764. TGuard<TMutex> g(Lock);
  765. if (Storage.GetSize() > 0) {
  766. for (const TItem *i = Storage.GetFront(), *e = Storage.GetBack();; Storage.Inc(i)) {
  767. r.Push(*i);
  768. if (i == e) {
  769. break;
  770. }
  771. }
  772. }
  773. }
  774. };
  775. #ifndef LWTRACE_DISABLE
  776. // Class representing a specific event
  777. template <LWTRACE_TEMPLATE_PARAMS>
  778. struct TUserEvent {
  779. TEvent Event;
  780. inline void operator()(TInMemoryLog& log, bool logTimestamp, LWTRACE_FUNCTION_PARAMS) const {
  781. TInMemoryLog::TAccessor la(log);
  782. if (TInMemoryLog::TItem* item = la.Add()) {
  783. item->Event = &Event;
  784. LWTRACE_PREPARE_PARAMS(item->Params);
  785. if (logTimestamp) {
  786. item->Timestamp = TInstant::Now();
  787. }
  788. }
  789. }
  790. };
  791. #endif
  792. }