log.h 27 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906
  1. #pragma once
  2. #include "probe.h"
  3. #include <util/datetime/base.h>
  4. #include <util/generic/algorithm.h>
  5. #include <util/generic/deque.h>
  6. #include <util/generic/noncopyable.h>
  7. #include <util/generic/vector.h>
  8. #include <util/string/printf.h>
  9. #include <util/system/atomic.h>
  10. #include <util/system/hp_timer.h>
  11. #include <util/system/mutex.h>
  12. #include <util/system/spinlock.h>
  13. #include <util/system/thread.h>
  14. #include <util/system/tls.h>
  15. namespace NLWTrace {
  16. // Cyclic buffer that pushes items to its back and pop item from front on overflow
  17. template <class TItem>
  18. class TCyclicBuffer: public TNonCopyable {
  19. private:
  20. TVector<TItem> Data;
  21. TItem* Front; // Points to the first item (valid iff Size > 0)
  22. TItem* Back; // Points to the last item (valid iff Size > 0)
  23. size_t Size; // Number of items in the buffer
  24. TItem* First() {
  25. return &*Data.begin();
  26. }
  27. TItem* Last() {
  28. return &*Data.end();
  29. }
  30. const TItem* First() const {
  31. return &*Data.begin();
  32. }
  33. const TItem* Last() const {
  34. return &*Data.end();
  35. }
  36. public:
  37. explicit TCyclicBuffer(size_t capacity)
  38. : Data(capacity)
  39. , Size(0)
  40. {
  41. }
  42. TItem* Add() {
  43. if (Size != 0) {
  44. Inc(Back);
  45. if (Back == Front) {
  46. Inc(Front); // Forget (pop_front) old items
  47. } else {
  48. Size++;
  49. }
  50. } else {
  51. Front = Back = First();
  52. Size = 1;
  53. }
  54. Back->Clear();
  55. return Back;
  56. }
  57. TItem* GetFront() {
  58. return Front;
  59. }
  60. TItem* GetBack() {
  61. return Back;
  62. }
  63. const TItem* GetFront() const {
  64. return Front;
  65. }
  66. const TItem* GetBack() const {
  67. return Back;
  68. }
  69. size_t GetSize() const {
  70. return Size;
  71. }
  72. bool IsFull() const {
  73. return Size == Data.size();
  74. }
  75. void Inc(TItem*& it) {
  76. it++;
  77. if (it == Last()) {
  78. it = First();
  79. }
  80. }
  81. void Inc(const TItem*& it) const {
  82. it++;
  83. if (it == Last()) {
  84. it = First();
  85. }
  86. }
  87. void Destroy() {
  88. Data.clear();
  89. Size = 0;
  90. }
  91. void Clear() {
  92. Size = 0;
  93. }
  94. void Swap(TCyclicBuffer& other) {
  95. Data.swap(other.Data);
  96. std::swap(Front, other.Front);
  97. std::swap(Back, other.Back);
  98. std::swap(Size, other.Size);
  99. }
  100. };
  101. // Buffer that pushes items to its back and pop item from front on expire
  102. template <class TItem>
  103. class TDurationBuffer: public TNonCopyable {
  104. protected:
  105. TDeque<TItem> Data;
  106. ui64 StoreDuration;
  107. ui8 CleanupCounter = 0;
  108. public:
  109. explicit TDurationBuffer(TDuration duration)
  110. : StoreDuration(DurationToCycles(duration))
  111. {
  112. }
  113. TItem* Add() {
  114. if (!CleanupCounter) {
  115. Cleanup();
  116. CleanupCounter = 128; // Make cleanup after every 128 additions
  117. }
  118. CleanupCounter--;
  119. Data.emplace_back();
  120. return &Data.back();
  121. }
  122. TItem* GetFront() {
  123. return &Data.front();
  124. }
  125. TItem* GetBack() {
  126. return &Data.back();
  127. }
  128. const TItem* GetFront() const {
  129. return &Data.front();
  130. }
  131. const TItem* GetBack() const {
  132. return &Data.back();
  133. }
  134. size_t GetSize() const {
  135. return Data.size();
  136. }
  137. bool Empty() const {
  138. return Data.empty();
  139. }
  140. void Destroy() {
  141. Data.clear();
  142. }
  143. void Swap(TDurationBuffer& other) {
  144. Data.swap(other.Data);
  145. std::swap(StoreDuration, other.StoreDuration);
  146. }
  147. private:
  148. void Cleanup() {
  149. ui64 cutoff = GetCycleCount();
  150. if (cutoff > StoreDuration) {
  151. cutoff -= StoreDuration;
  152. while (!Data.empty() && Data.front().GetTimestampCycles() < cutoff) {
  153. Data.pop_front();
  154. }
  155. }
  156. }
  157. };
  158. struct TLogItem {
  159. TProbe* Probe = nullptr;
  160. TParams Params;
  161. size_t SavedParamsCount;
  162. TInstant Timestamp;
  163. ui64 TimestampCycles;
  164. TLogItem() {
  165. }
  166. TLogItem(const TLogItem& other)
  167. : Probe(other.Probe)
  168. , SavedParamsCount(other.SavedParamsCount)
  169. , Timestamp(other.Timestamp)
  170. , TimestampCycles(other.TimestampCycles)
  171. {
  172. Clone(other);
  173. }
  174. ~TLogItem() {
  175. Destroy();
  176. }
  177. TLogItem& operator=(const TLogItem& other) {
  178. Destroy();
  179. Probe = other.Probe;
  180. SavedParamsCount = other.SavedParamsCount;
  181. Timestamp = other.Timestamp;
  182. TimestampCycles = other.TimestampCycles;
  183. Clone(other);
  184. return *this;
  185. }
  186. void Clear() {
  187. Destroy();
  188. Probe = nullptr;
  189. }
  190. void ToProtobuf(TLogItemPb& pb) const {
  191. pb.SetName(Probe->Event.Name);
  192. pb.SetProvider(Probe->Event.GetProvider());
  193. if (SavedParamsCount > 0) {
  194. TString paramValues[LWTRACE_MAX_PARAMS];
  195. Probe->Event.Signature.SerializeParams(Params, paramValues);
  196. for (size_t pi = 0; pi < SavedParamsCount; pi++) {
  197. pb.AddParams(paramValues[pi]);
  198. }
  199. }
  200. pb.SetTimestamp(Timestamp.GetValue());
  201. pb.SetTimestampCycles(TimestampCycles);
  202. }
  203. TTypedParam GetParam(const TString& param) const {
  204. if (SavedParamsCount == 0) {
  205. return TTypedParam();
  206. } else {
  207. size_t idx = Probe->Event.Signature.FindParamIndex(param);
  208. if (idx >= SavedParamsCount) { // Also covers idx=-1 case (not found)
  209. return TTypedParam();
  210. } else {
  211. EParamTypePb type = ParamTypeToProtobuf(Probe->Event.Signature.ParamTypes[idx]);
  212. return TTypedParam(type, Params.Param[idx]);
  213. }
  214. }
  215. }
  216. ui64 GetTimestampCycles() const {
  217. return TimestampCycles;
  218. }
  219. private:
  220. void Clone(const TLogItem& other) {
  221. if (Probe && SavedParamsCount > 0) {
  222. Probe->Event.Signature.CloneParams(Params, other.Params);
  223. }
  224. }
  225. void Destroy() {
  226. if (Probe && SavedParamsCount > 0) {
  227. Probe->Event.Signature.DestroyParams(Params);
  228. }
  229. }
  230. };
  231. struct TTrackLog {
  232. struct TItem : TLogItem {
  233. TThread::TId ThreadId;
  234. TItem() = default;
  235. TItem(TThread::TId tid, const TLogItem& item)
  236. : TLogItem(item)
  237. , ThreadId(tid)
  238. {
  239. }
  240. };
  241. using TItems = TVector<TItem>;
  242. TItems Items;
  243. bool Truncated = false;
  244. ui64 Id = 0;
  245. void Clear() {
  246. Items.clear();
  247. Truncated = false;
  248. }
  249. ui64 GetTimestampCycles() const {
  250. return Items.empty() ? 0 : Items.front().GetTimestampCycles();
  251. }
  252. };
  253. // Log that uses per-thread cyclic buffers to store items
  254. template <class T>
  255. class TCyclicLogImpl: public TNonCopyable {
  256. public:
  257. using TLog = TCyclicLogImpl;
  258. using TItem = T;
  259. private:
  260. using TBuffer = TCyclicBuffer<TItem>;
  261. class TStorage {
  262. private:
  263. // Data that can be accessed in lock-free way from reader/writer
  264. TAtomic Writers = 0;
  265. mutable TBuffer* volatile CurBuffer = nullptr;
  266. // Data that can be accessed only from reader
  267. // NOTE: multiple readers are serialized by TCyclicLogImpl::Lock
  268. mutable TBuffer* OldBuffer = nullptr;
  269. mutable TBuffer* NewBuffer = nullptr;
  270. TLog* volatile Log = nullptr;
  271. TThread::TId ThreadId = 0;
  272. TAtomic EventsCount = 0;
  273. public:
  274. TStorage() {
  275. }
  276. explicit TStorage(TLog* log)
  277. : CurBuffer(new TBuffer(log->GetCapacity()))
  278. , OldBuffer(new TBuffer(log->GetCapacity()))
  279. , NewBuffer(new TBuffer(log->GetCapacity()))
  280. , Log(log)
  281. , ThreadId(TThread::CurrentThreadId())
  282. {
  283. Log->RegisterThread(this);
  284. }
  285. ~TStorage() {
  286. if (TLog* log = AtomicSwap(&Log, nullptr)) {
  287. AtomicBarrier(); // Serialize `Log' and TCyclicLogImpl::Lock memory order
  288. // NOTE: the following function swaps `this' with `new TStorage()'
  289. log->UnregisterThreadAndMakeOrphan(this);
  290. } else {
  291. // NOTE: `Log' can be nullptr if either it is orphan storage or TryDismiss() succeeded
  292. // NOTE: in both cases it is ok to call these deletes
  293. delete CurBuffer;
  294. delete OldBuffer;
  295. delete NewBuffer;
  296. }
  297. }
  298. bool TryDismiss() {
  299. // TCyclicLogImpl::Lock implied (no readers)
  300. if (TLog* log = AtomicSwap(&Log, nullptr)) {
  301. TBuffer* curBuffer = AtomicSwap(&CurBuffer, nullptr);
  302. WaitForWriters();
  303. // At this point we guarantee that there is no and wont be active writer
  304. delete curBuffer;
  305. delete OldBuffer;
  306. delete NewBuffer;
  307. OldBuffer = nullptr;
  308. NewBuffer = nullptr;
  309. return true;
  310. } else {
  311. // ~TStorage() is in progress
  312. return false;
  313. }
  314. }
  315. void WaitForWriters() const {
  316. while (AtomicGet(Writers) > 0) {
  317. SpinLockPause();
  318. }
  319. }
  320. TThread::TId GetThreadId() const {
  321. // TCyclicLogImpl::Lock implied (no readers)
  322. return ThreadId;
  323. }
  324. size_t GetEventsCount() const {
  325. // TCyclicLogImpl::Lock implied (no readers)
  326. return AtomicGet(EventsCount);
  327. }
  328. void Swap(TStorage& other) {
  329. // TCyclicLogImpl::Lock implied (no readers)
  330. std::swap(CurBuffer, other.CurBuffer);
  331. std::swap(OldBuffer, other.OldBuffer);
  332. std::swap(NewBuffer, other.NewBuffer);
  333. std::swap(Log, other.Log);
  334. std::swap(ThreadId, other.ThreadId);
  335. std::swap(EventsCount, other.EventsCount);
  336. }
  337. TBuffer* StartWriter() {
  338. AtomicIncrement(Writers);
  339. return const_cast<TBuffer*>(AtomicGet(CurBuffer));
  340. }
  341. void StopWriter() {
  342. AtomicDecrement(Writers);
  343. }
  344. void IncEventsCount() {
  345. AtomicIncrement(EventsCount);
  346. }
  347. template <class TReader>
  348. void ReadItems(TReader& r) const {
  349. // TCyclicLogImpl::Lock implied
  350. NewBuffer = AtomicSwap(&CurBuffer, NewBuffer);
  351. WaitForWriters();
  352. // Merge new buffer into old buffer
  353. if (NewBuffer->IsFull()) {
  354. std::swap(NewBuffer, OldBuffer);
  355. } else {
  356. if (NewBuffer->GetSize() > 0) {
  357. for (const TItem *i = NewBuffer->GetFront(), *e = NewBuffer->GetBack();; NewBuffer->Inc(i)) {
  358. TItem* oldSlot = OldBuffer->Add();
  359. *oldSlot = *i;
  360. if (i == e) {
  361. break;
  362. }
  363. }
  364. }
  365. }
  366. NewBuffer->Clear();
  367. // Iterate over old buffer
  368. if (OldBuffer->GetSize() > 0) {
  369. for (const TItem *i = OldBuffer->GetFront(), *e = OldBuffer->GetBack();; OldBuffer->Inc(i)) {
  370. r.Push(ThreadId, *i);
  371. if (i == e) {
  372. break;
  373. }
  374. }
  375. }
  376. }
  377. };
  378. size_t Capacity;
  379. Y_THREAD(TStorage)
  380. PerThreadStorage;
  381. TSpinLock Lock;
  382. // If thread exits its storage is destroyed, so we move it into OrphanStorages before destruction
  383. TVector<TAtomicSharedPtr<TStorage>> OrphanStorages;
  384. typedef TVector<TStorage*> TStoragesVec;
  385. TStoragesVec StoragesVec;
  386. TAtomic ThreadsCount;
  387. public:
  388. explicit TCyclicLogImpl(size_t capacity)
  389. : Capacity(capacity)
  390. , PerThreadStorage(this)
  391. , ThreadsCount(0)
  392. {
  393. }
  394. ~TCyclicLogImpl() {
  395. for (bool again = true; again;) {
  396. TGuard<TSpinLock> g(Lock);
  397. AtomicBarrier(); // Serialize `storage->Log' and Lock memory order
  398. again = false;
  399. while (!StoragesVec.empty()) {
  400. TStorage* storage = StoragesVec.back();
  401. // TStorage destructor can be called when TCyclicLogImpl is already destructed
  402. // So we ensure this does not lead to problems
  403. // NOTE: Y_THREAD(TStorage) destructs TStorage object for a specific thread only on that thread exit
  404. // NOTE: this issue can lead to memleaks if threads never exit and many TCyclicLogImpl are created
  405. if (storage->TryDismiss()) {
  406. StoragesVec.pop_back();
  407. } else {
  408. // Rare case when another thread is running ~TStorage() -- let it finish
  409. again = true;
  410. SpinLockPause();
  411. break;
  412. }
  413. }
  414. }
  415. }
  416. size_t GetCapacity() const {
  417. return Capacity;
  418. }
  419. size_t GetEventsCount() const {
  420. size_t events = 0;
  421. TGuard<TSpinLock> g(Lock);
  422. for (auto i : StoragesVec) {
  423. events += i->GetEventsCount();
  424. }
  425. for (const auto& orphanStorage : OrphanStorages) {
  426. events += orphanStorage->GetEventsCount();
  427. }
  428. return events;
  429. }
  430. size_t GetThreadsCount() const {
  431. return AtomicGet(ThreadsCount);
  432. }
  433. void RegisterThread(TStorage* storage) {
  434. TGuard<TSpinLock> g(Lock);
  435. StoragesVec.push_back(storage);
  436. AtomicIncrement(ThreadsCount);
  437. }
  438. void UnregisterThreadAndMakeOrphan(TStorage* storage) {
  439. TGuard<TSpinLock> g(Lock);
  440. // `storage' writers are not possible at this scope because
  441. // UnregisterThreadAndMakeOrphan is only called from exiting threads.
  442. // `storage' readers are not possible at this scope due to Lock guard.
  443. Erase(StoragesVec, storage);
  444. TAtomicSharedPtr<TStorage> orphan(new TStorage());
  445. orphan->Swap(*storage); // Swap is required because we cannot take ownership from Y_THREAD(TStorage) object
  446. OrphanStorages.push_back(orphan);
  447. }
  448. template <class TReader>
  449. void ReadThreads(TReader& r) const {
  450. TGuard<TSpinLock> g(Lock);
  451. for (auto i : StoragesVec) {
  452. r.PushThread(i->GetThreadId());
  453. }
  454. for (const auto& orphanStorage : OrphanStorages) {
  455. r.PushThread(orphanStorage->GetThreadId());
  456. }
  457. }
  458. template <class TReader>
  459. void ReadItems(TReader& r) const {
  460. TGuard<TSpinLock> g(Lock);
  461. for (auto i : StoragesVec) {
  462. i->ReadItems(r);
  463. }
  464. for (const auto& orphanStorage : OrphanStorages) {
  465. orphanStorage->ReadItems(r);
  466. }
  467. }
  468. class TAccessor {
  469. private:
  470. TStorage& Storage;
  471. TBuffer* Buffer;
  472. public:
  473. explicit TAccessor(TLog& log)
  474. : Storage(log.PerThreadStorage.Get())
  475. , Buffer(Storage.StartWriter())
  476. {
  477. }
  478. ~TAccessor() {
  479. Storage.StopWriter();
  480. }
  481. TItem* Add() {
  482. if (Buffer) {
  483. Storage.IncEventsCount();
  484. return Buffer->Add();
  485. } else {
  486. // TStorage detached from trace due to trace destruction
  487. // so we should not try log anything
  488. return nullptr;
  489. }
  490. }
  491. };
  492. friend class TAccessor;
  493. };
  494. using TCyclicLog = TCyclicLogImpl<TLogItem>;
  495. using TCyclicDepot = TCyclicLogImpl<TTrackLog>;
  496. // Log that uses per-thread buffers to store items up to given duration
  497. template <class T>
  498. class TDurationLogImpl: public TNonCopyable {
  499. public:
  500. using TLog = TDurationLogImpl;
  501. using TItem = T;
  502. class TAccessor;
  503. friend class TAccessor;
  504. class TAccessor: public TGuard<TSpinLock> {
  505. private:
  506. TLog& Log;
  507. public:
  508. explicit TAccessor(TLog& log)
  509. : TGuard<TSpinLock>(log.PerThreadStorage.Get().Lock)
  510. , Log(log)
  511. {
  512. }
  513. TItem* Add() {
  514. return Log.PerThreadStorage.Get().Add();
  515. }
  516. };
  517. private:
  518. class TStorage: public TDurationBuffer<TItem> {
  519. private:
  520. TLog* Log;
  521. TThread::TId ThreadId;
  522. ui64 EventsCount;
  523. public:
  524. TSpinLock Lock;
  525. TStorage()
  526. : TDurationBuffer<TItem>(TDuration::Zero())
  527. , Log(nullptr)
  528. , ThreadId(0)
  529. , EventsCount(0)
  530. {
  531. }
  532. explicit TStorage(TLog* log)
  533. : TDurationBuffer<TItem>(log->GetDuration())
  534. , Log(log)
  535. , ThreadId(TThread::CurrentThreadId())
  536. , EventsCount(0)
  537. {
  538. Log->RegisterThread(this);
  539. }
  540. ~TStorage() {
  541. if (Log) {
  542. Log->UnregisterThread(this);
  543. }
  544. }
  545. void DetachFromTraceLog() {
  546. Log = nullptr;
  547. }
  548. TItem* Add() {
  549. EventsCount++;
  550. return TDurationBuffer<TItem>::Add();
  551. }
  552. bool Expired(ui64 now) const {
  553. return this->Empty() ? true : this->GetBack()->GetTimestampCycles() + this->StoreDuration < now;
  554. }
  555. TThread::TId GetThreadId() const {
  556. return ThreadId;
  557. }
  558. size_t GetEventsCount() const {
  559. return EventsCount;
  560. }
  561. void Swap(TStorage& other) {
  562. TDurationBuffer<TItem>::Swap(other);
  563. std::swap(Log, other.Log);
  564. std::swap(ThreadId, other.ThreadId);
  565. std::swap(EventsCount, other.EventsCount);
  566. }
  567. template <class TReader>
  568. void ReadItems(ui64 now, ui64 duration, TReader& r) const {
  569. TGuard<TSpinLock> g(Lock);
  570. if (now > duration) {
  571. ui64 cutoff = now - duration;
  572. for (const TItem& item : this->Data) {
  573. if (item.GetTimestampCycles() >= cutoff) {
  574. r.Push(ThreadId, item);
  575. }
  576. }
  577. } else {
  578. for (const TItem& item : this->Data) {
  579. r.Push(ThreadId, item);
  580. }
  581. }
  582. }
  583. };
  584. TDuration Duration;
  585. Y_THREAD(TStorage)
  586. PerThreadStorage;
  587. TSpinLock Lock;
  588. typedef TVector<TAtomicSharedPtr<TStorage>> TOrphanStorages;
  589. TOrphanStorages OrphanStorages; // if thread exits its storage is destroyed, so we move it into OrphanStorages before destruction
  590. TAtomic OrphanStoragesEventsCount = 0;
  591. typedef TVector<TStorage*> TStoragesVec;
  592. TStoragesVec StoragesVec;
  593. TAtomic ThreadsCount;
  594. public:
  595. explicit TDurationLogImpl(TDuration duration)
  596. : Duration(duration)
  597. , PerThreadStorage(this)
  598. , ThreadsCount(0)
  599. {
  600. }
  601. ~TDurationLogImpl() {
  602. for (auto storage : StoragesVec) {
  603. // NOTE: Y_THREAD(TStorage) destructs TStorage object for a specific thread only on that thread exit
  604. // NOTE: this issue can lead to memleaks if threads never exit and many TTraceLogs are created
  605. storage->Destroy();
  606. // TraceLogStorage destructor can be called when TTraceLog is already destructed
  607. // So we ensure this does not lead to problems
  608. storage->DetachFromTraceLog();
  609. }
  610. }
  611. TDuration GetDuration() const {
  612. return Duration;
  613. }
  614. size_t GetEventsCount() const {
  615. size_t events = AtomicGet(OrphanStoragesEventsCount);
  616. TGuard<TSpinLock> g(Lock);
  617. for (auto i : StoragesVec) {
  618. events += i->GetEventsCount();
  619. }
  620. return events;
  621. }
  622. size_t GetThreadsCount() const {
  623. return AtomicGet(ThreadsCount);
  624. }
  625. void RegisterThread(TStorage* storage) {
  626. TGuard<TSpinLock> g(Lock);
  627. StoragesVec.push_back(storage);
  628. AtomicIncrement(ThreadsCount);
  629. }
  630. void UnregisterThread(TStorage* storage) {
  631. TGuard<TSpinLock> g(Lock);
  632. for (auto i = StoragesVec.begin(), e = StoragesVec.end(); i != e; ++i) {
  633. if (*i == storage) {
  634. StoragesVec.erase(i);
  635. break;
  636. }
  637. }
  638. TAtomicSharedPtr<TStorage> orphan(new TStorage());
  639. orphan->Swap(*storage);
  640. orphan->DetachFromTraceLog();
  641. AtomicAdd(OrphanStoragesEventsCount, orphan->GetEventsCount());
  642. OrphanStorages.push_back(orphan);
  643. CleanOrphanStorages(GetCycleCount());
  644. }
  645. void CleanOrphanStorages(ui64 now) {
  646. EraseIf(OrphanStorages, [=](const TAtomicSharedPtr<TStorage>& ptr) {
  647. const TStorage& storage = *ptr;
  648. return storage.Expired(now);
  649. });
  650. }
  651. template <class TReader>
  652. void ReadThreads(TReader& r) const {
  653. TGuard<TSpinLock> g(Lock);
  654. for (TStorage* i : StoragesVec) {
  655. r.PushThread(i->GetThreadId());
  656. }
  657. for (const auto& orphanStorage : OrphanStorages) {
  658. r.PushThread(orphanStorage->GetThreadId());
  659. }
  660. }
  661. template <class TReader>
  662. void ReadItems(ui64 now, ui64 duration, TReader& r) const {
  663. TGuard<TSpinLock> g(Lock);
  664. for (TStorage* storage : StoragesVec) {
  665. storage->ReadItems(now, duration, r);
  666. }
  667. for (const auto& orphanStorage : OrphanStorages) {
  668. orphanStorage->ReadItems(now, duration, r);
  669. }
  670. }
  671. };
  672. using TDurationLog = TDurationLogImpl<TLogItem>;
  673. using TDurationDepot = TDurationLogImpl<TTrackLog>;
  674. // Log that uses one cyclic buffer to store items
  675. // Each item is a result of execution of some event
  676. class TInMemoryLog: public TNonCopyable {
  677. public:
  678. struct TItem {
  679. const TEvent* Event;
  680. TParams Params;
  681. TInstant Timestamp;
  682. TItem()
  683. : Event(nullptr)
  684. {
  685. }
  686. TItem(const TItem& other)
  687. : Event(other.Event)
  688. , Timestamp(other.Timestamp)
  689. {
  690. Clone(other);
  691. }
  692. ~TItem() {
  693. Destroy();
  694. }
  695. TItem& operator=(const TItem& other) {
  696. Destroy();
  697. Event = other.Event;
  698. Timestamp = other.Timestamp;
  699. Clone(other);
  700. return *this;
  701. }
  702. void Clear() {
  703. Destroy();
  704. Event = nullptr;
  705. }
  706. private:
  707. void Clone(const TItem& other) {
  708. if (Event && Event->Signature.ParamCount > 0) {
  709. Event->Signature.CloneParams(Params, other.Params);
  710. }
  711. }
  712. void Destroy() {
  713. if (Event && Event->Signature.ParamCount > 0) {
  714. Event->Signature.DestroyParams(Params);
  715. }
  716. }
  717. };
  718. class TAccessor;
  719. friend class TAccessor;
  720. class TAccessor: public TGuard<TMutex> {
  721. private:
  722. TInMemoryLog& Log;
  723. public:
  724. explicit TAccessor(TInMemoryLog& log)
  725. : TGuard<TMutex>(log.Lock)
  726. , Log(log)
  727. {
  728. }
  729. TItem* Add() {
  730. return Log.Storage.Add();
  731. }
  732. };
  733. private:
  734. TMutex Lock;
  735. TCyclicBuffer<TItem> Storage;
  736. public:
  737. explicit TInMemoryLog(size_t capacity)
  738. : Storage(capacity)
  739. {
  740. }
  741. template <class TReader>
  742. void ReadItems(TReader& r) const {
  743. TGuard<TMutex> g(Lock);
  744. if (Storage.GetSize() > 0) {
  745. for (const TItem *i = Storage.GetFront(), *e = Storage.GetBack();; Storage.Inc(i)) {
  746. r.Push(*i);
  747. if (i == e) {
  748. break;
  749. }
  750. }
  751. }
  752. }
  753. };
  754. #ifndef LWTRACE_DISABLE
  755. // Class representing a specific event
  756. template <LWTRACE_TEMPLATE_PARAMS>
  757. struct TUserEvent {
  758. TEvent Event;
  759. inline void operator()(TInMemoryLog& log, bool logTimestamp, LWTRACE_FUNCTION_PARAMS) const {
  760. TInMemoryLog::TAccessor la(log);
  761. if (TInMemoryLog::TItem* item = la.Add()) {
  762. item->Event = &Event;
  763. LWTRACE_PREPARE_PARAMS(item->Params);
  764. if (logTimestamp) {
  765. item->Timestamp = TInstant::Now();
  766. }
  767. }
  768. }
  769. };
  770. #endif
  771. }