shuttle.h 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359
  1. #pragma once
  2. #include "event.h"
  3. #include <library/cpp/containers/stack_vector/stack_vec.h>
  4. #include <library/cpp/deprecated/atomic/atomic.h>
  5. #include <util/generic/ptr.h>
  6. #include <util/system/spinlock.h>
  7. #include <algorithm>
  8. #include <type_traits>
  9. namespace NLWTrace {
  10. struct TProbe;
  11. class IShuttle;
  12. using TShuttlePtr = TIntrusivePtr<IShuttle>;
  13. // Represents interface of tracing context passing by application between probes
  14. class alignas(8) IShuttle: public TThrRefBase {
  15. private:
  16. ui64 TraceIdx;
  17. ui64 SpanId;
  18. ui64 ParentSpanId = 0;
  19. TAtomic Status = 0;
  20. static constexpr ui64 DeadFlag = 0x1ull;
  21. TShuttlePtr Next;
  22. public:
  23. explicit IShuttle(ui64 traceIdx, ui64 spanId)
  24. : TraceIdx(traceIdx)
  25. , SpanId(spanId)
  26. {
  27. }
  28. virtual ~IShuttle() {
  29. }
  30. ui64 GetTraceIdx() const {
  31. return TraceIdx;
  32. }
  33. ui64 GetSpanId() const {
  34. return SpanId;
  35. }
  36. ui64 GetParentSpanId() const {
  37. return ParentSpanId;
  38. }
  39. void SetParentSpanId(ui64 parentSpanId) {
  40. ParentSpanId = parentSpanId;
  41. }
  42. template <class F, class R>
  43. R UnlessDead(F func, R dead) {
  44. while (true) {
  45. ui64 status = AtomicGet(Status);
  46. if (status & DeadFlag) {
  47. return dead;
  48. }
  49. if (AtomicCas(&Status, status + 2, status)) {
  50. R result = func();
  51. ATOMIC_COMPILER_BARRIER();
  52. AtomicSub(Status, 2);
  53. return result;
  54. }
  55. }
  56. }
  57. template <class F>
  58. void UnlessDead(F func) {
  59. while (true) {
  60. ui64 status = AtomicGet(Status);
  61. if (status & DeadFlag) {
  62. return;
  63. }
  64. if (AtomicCas(&Status, status + 2, status)) {
  65. func();
  66. ATOMIC_COMPILER_BARRIER();
  67. AtomicSub(Status, 2);
  68. return;
  69. }
  70. }
  71. }
  72. void Serialize(TShuttleTrace& msg) {
  73. UnlessDead([&] {
  74. DoSerialize(msg);
  75. });
  76. }
  77. // Returns false iff shuttle should be destroyed
  78. bool AddProbe(TProbe* probe, const TParams& params, ui64 timestamp = 0) {
  79. return UnlessDead([&] {
  80. return DoAddProbe(probe, params, timestamp);
  81. }, false);
  82. }
  83. // Track object was destroyed
  84. void EndOfTrack() {
  85. if (Next) {
  86. Next->EndOfTrack();
  87. Next.Reset();
  88. }
  89. UnlessDead([&] {
  90. DoEndOfTrack();
  91. });
  92. }
  93. // Shuttle was dropped from orbit
  94. TShuttlePtr Drop() {
  95. UnlessDead([&] {
  96. DoDrop();
  97. });
  98. return Detach(); // Detached from orbit on Drop
  99. }
  100. TShuttlePtr Detach() {
  101. TShuttlePtr result;
  102. result.Swap(Next);
  103. return result;
  104. }
  105. // Trace session was destroyed
  106. void Kill() {
  107. AtomicAdd(Status, 1); // Sets DeadFlag
  108. while (AtomicGet(Status) != 1) { // Wait until any vcall is over
  109. SpinLockPause();
  110. }
  111. // After this point all virtual calls on shuttle are disallowed
  112. ATOMIC_COMPILER_BARRIER();
  113. }
  114. const TShuttlePtr& GetNext() const {
  115. return Next;
  116. }
  117. TShuttlePtr& GetNext() {
  118. return Next;
  119. }
  120. void SetNext(const TShuttlePtr& next) {
  121. Next = next;
  122. }
  123. bool Fork(TShuttlePtr& child) {
  124. return UnlessDead([&] {
  125. return DoFork(child);
  126. }, true);
  127. }
  128. bool Join(TShuttlePtr& child) {
  129. return UnlessDead([&] {
  130. return DoJoin(child);
  131. }, false);
  132. }
  133. bool IsDead() {
  134. return AtomicGet(Status) & DeadFlag;
  135. }
  136. protected:
  137. virtual bool DoAddProbe(TProbe* probe, const TParams& params, ui64 timestamp) = 0;
  138. virtual void DoEndOfTrack() = 0;
  139. virtual void DoDrop() = 0;
  140. virtual void DoSerialize(TShuttleTrace& msg) = 0;
  141. virtual bool DoFork(TShuttlePtr& child) = 0;
  142. virtual bool DoJoin(const TShuttlePtr& child) = 0;
  143. };
  144. // Not thread-safe orbit
  145. // Orbit should not be used concurrenty, lock is used
  146. // to ensure this is the case and avoid race condition if not.
  147. class TOrbit {
  148. private:
  149. TShuttlePtr HeadNoLock;
  150. public:
  151. TOrbit() = default;
  152. TOrbit(const TOrbit&) = delete;
  153. TOrbit(TOrbit&&) = default;
  154. TOrbit& operator=(const TOrbit&) = delete;
  155. TOrbit& operator=(TOrbit&&) = default;
  156. ~TOrbit() {
  157. Reset();
  158. }
  159. void Reset() {
  160. NotConcurrent([] (TShuttlePtr& head) {
  161. if (head) {
  162. head->EndOfTrack();
  163. head.Reset();
  164. }
  165. });
  166. }
  167. // Checks if there is at least one shuttle in orbit
  168. // NOTE: used by every LWTRACK macro check, so keep it optimized - do not lock
  169. bool HasShuttles() const {
  170. return HeadNoLock.Get();
  171. }
  172. void AddShuttle(const TShuttlePtr& shuttle) {
  173. NotConcurrent([&] (TShuttlePtr& head) {
  174. Y_ABORT_UNLESS(!shuttle->GetNext());
  175. shuttle->SetNext(head);
  176. head = shuttle;
  177. });
  178. }
  179. // Moves every shuttle from `orbit' into this
  180. void Take(TOrbit& orbit) {
  181. NotConcurrent([&] (TShuttlePtr& head) {
  182. orbit.NotConcurrent([&] (TShuttlePtr& oHead) {
  183. TShuttlePtr* ref = &oHead;
  184. if (ref->Get()) {
  185. while (TShuttlePtr& next = (*ref)->GetNext()) {
  186. ref = &next;
  187. }
  188. (*ref)->SetNext(head);
  189. head.Swap(oHead);
  190. oHead.Reset();
  191. }
  192. });
  193. });
  194. }
  195. void AddProbe(TProbe* probe, const TParams& params, ui64 timestamp = 0) {
  196. NotConcurrent([&] (TShuttlePtr& head) {
  197. TShuttlePtr* ref = &head;
  198. while (IShuttle* s = ref->Get()) {
  199. if (!s->AddProbe(probe, params, timestamp)) { // Shuttle self-destructed
  200. *ref = s->Drop(); // May destroy shuttle
  201. } else {
  202. ref = &s->GetNext(); // Keep shuttle
  203. }
  204. }
  205. });
  206. }
  207. template <class TFunc>
  208. void ForEachShuttle(ui64 traceIdx, TFunc&& func) {
  209. NotConcurrent([&] (TShuttlePtr& head) {
  210. TShuttlePtr* ref = &head;
  211. while (IShuttle* s = ref->Get()) {
  212. if (s->GetTraceIdx() == traceIdx && !func(s)) { // Shuttle self-destructed
  213. *ref = s->Drop(); // May destroy shuttle
  214. } else {
  215. ref = &s->GetNext(); // Keep shuttle
  216. }
  217. }
  218. });
  219. }
  220. void Serialize(ui64 traceIdx, TShuttleTrace& msg) {
  221. ForEachShuttle(traceIdx, [&] (NLWTrace::IShuttle* shuttle) {
  222. shuttle->Serialize(msg);
  223. return false;
  224. });
  225. }
  226. bool HasShuttle(ui64 traceIdx) {
  227. return NotConcurrent([=] (TShuttlePtr& head) {
  228. TShuttlePtr ref = head;
  229. while (IShuttle* s = ref.Get()) {
  230. if (s->GetTraceIdx() == traceIdx) {
  231. return true;
  232. } else {
  233. ref = s->GetNext();
  234. }
  235. }
  236. return false;
  237. });
  238. }
  239. bool Fork(TOrbit& child) {
  240. return NotConcurrent([&] (TShuttlePtr& head) {
  241. return child.NotConcurrent([&] (TShuttlePtr& cHead) {
  242. bool result = true;
  243. TShuttlePtr* ref = &head;
  244. while (IShuttle* shuttle = ref->Get()) {
  245. if (shuttle->IsDead()) {
  246. *ref = shuttle->Drop();
  247. } else {
  248. result = result && shuttle->Fork(cHead);
  249. ref = &shuttle->GetNext();
  250. }
  251. }
  252. return result;
  253. });
  254. });
  255. }
  256. void Join(TOrbit& child) {
  257. NotConcurrent([&] (TShuttlePtr& head) {
  258. child.NotConcurrent([&] (TShuttlePtr& cHead) {
  259. TShuttlePtr* ref = &head;
  260. while (IShuttle* shuttle = ref->Get()) {
  261. if (shuttle->IsDead()) {
  262. *ref = shuttle->Drop();
  263. } else {
  264. child.Join(cHead, shuttle);
  265. ref = &shuttle->GetNext();
  266. }
  267. }
  268. });
  269. });
  270. }
  271. private:
  272. static void Join(TShuttlePtr& head, IShuttle* parent) {
  273. TShuttlePtr* ref = &head;
  274. while (IShuttle* child = ref->Get()) {
  275. if (parent->GetTraceIdx() == child->GetTraceIdx() && parent->GetSpanId() == child->GetParentSpanId()) {
  276. TShuttlePtr next = child->Detach();
  277. parent->Join(*ref);
  278. *ref = next;
  279. } else {
  280. ref = &child->GetNext();
  281. }
  282. }
  283. }
  284. template <class TFunc>
  285. typename std::invoke_result<TFunc, TShuttlePtr&>::type NotConcurrent(TFunc func) {
  286. // `HeadNoLock` is binary-copied into local `headCopy` and written with special `locked` value
  287. // during not concurrent operations. Not concurrent operations should not work
  288. // with `HeadNoLock` directly. Instead `headCopy` is passed into `func` by reference and
  289. // after `func()` it is binary-copied back into `HeadNoLock`.
  290. static_assert(sizeof(HeadNoLock) == sizeof(TAtomic));
  291. TAtomic* headPtr = reinterpret_cast<TAtomic*>(&HeadNoLock);
  292. TAtomicBase headCopy = AtomicGet(*headPtr);
  293. static constexpr TAtomicBase locked = 0x1ull;
  294. if (headCopy != locked && AtomicCas(headPtr, locked, headCopy)) {
  295. struct TUnlocker { // to avoid specialization for R=void
  296. TAtomic* HeadPtr;
  297. TAtomicBase* HeadCopy;
  298. ~TUnlocker() {
  299. ATOMIC_COMPILER_BARRIER();
  300. AtomicSet(*HeadPtr, *HeadCopy);
  301. }
  302. } scoped{headPtr, &headCopy};
  303. return func(*reinterpret_cast<TShuttlePtr*>(&headCopy));
  304. } else {
  305. LockFailed();
  306. return typename std::invoke_result<TFunc, TShuttlePtr&>::type();
  307. }
  308. }
  309. void LockFailed();
  310. };
  311. inline size_t HasShuttles(const TOrbit& orbit) {
  312. return orbit.HasShuttles();
  313. }
  314. }