hot_swap.h 5.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192
  1. #pragma once
  2. #include <util/generic/cast.h>
  3. #include <util/generic/ptr.h>
  4. #include <util/generic/utility.h>
  5. #include <library/cpp/deprecated/atomic/atomic.h>
  6. #include <util/system/guard.h>
  7. #include <util/system/spinlock.h>
  8. #include <util/system/yassert.h>
  9. namespace NHotSwapPrivate {
  10. // Special guard object for THotSwap
  11. class TWriterLock {
  12. public:
  13. // Implements multi-lock wait-free interface for readers
  14. void Acquire() noexcept;
  15. void Release() noexcept;
  16. void WaitAllReaders() const noexcept;
  17. private:
  18. TAtomic ReadersCount = 0;
  19. };
  20. }
  21. /// Object container that can be switched to another such object concurrently.
  22. /// T must support atomic reference counting
  23. ///
  24. /// Typical usage is when we have rarely changed, but frequently used data.
  25. /// If we want to use reference counting, we can't concurrently change and read
  26. /// intrusive pointer without extra synchronization.
  27. /// This class provides such synchronization mechanism with minimal read time.
  28. ///
  29. ///
  30. /// Usage sample
  31. ///
  32. /// THotSwap<T> Obj;
  33. ///
  34. /// thread 1:
  35. /// ...
  36. /// TIntrusivePtr<T> obj = Obj.AtomicLoad(); // get current object
  37. /// ... use of obj
  38. ///
  39. /// thread 2:
  40. /// ...
  41. /// Obj.AtomicStore(new T()); // set new object
  42. ///
  43. template <class T, class Ops = TDefaultIntrusivePtrOps<T>>
  44. class THotSwap {
  45. public:
  46. using TPtr = TIntrusivePtr<T, Ops>;
  47. public:
  48. THotSwap() noexcept {
  49. }
  50. explicit THotSwap(T* p) noexcept {
  51. AtomicStore(p);
  52. }
  53. explicit THotSwap(const TPtr& p) noexcept
  54. : THotSwap(p.Get())
  55. {
  56. }
  57. THotSwap(const THotSwap& p) noexcept
  58. : THotSwap(p.AtomicLoad())
  59. {
  60. }
  61. THotSwap(THotSwap&& other) noexcept {
  62. DoSwap(RawPtr, other.RawPtr); // we don't need thread safety, because both objects are local
  63. }
  64. ~THotSwap() noexcept {
  65. AtomicStore(nullptr);
  66. }
  67. THotSwap& operator=(const THotSwap& p) noexcept {
  68. AtomicStore(p.AtomicLoad());
  69. return *this;
  70. }
  71. /// Wait-free read pointer to object
  72. ///
  73. /// @returns Current value of stored object
  74. TPtr AtomicLoad() const noexcept {
  75. const TAtomicBase lockIndex = GetLockIndex();
  76. auto guard = Guard(WriterLocks[lockIndex]); // non-blocking (for other AtomicLoad()'s) guard
  77. return GetRawPtr();
  78. }
  79. /// Update to new object
  80. ///
  81. /// @param[in] p New value to store
  82. void AtomicStore(T* p) noexcept;
  83. /// Update to new object
  84. ///
  85. /// @param[in] p New value to store
  86. void AtomicStore(const TPtr& p) noexcept {
  87. AtomicStore(p.Get());
  88. }
  89. private:
  90. T* GetRawPtr() const noexcept {
  91. return reinterpret_cast<T*>(AtomicGet(RawPtr));
  92. }
  93. TAtomicBase GetLockIndex() const noexcept {
  94. return AtomicGet(LockIndex);
  95. }
  96. TAtomicBase SwitchLockIndex() noexcept; // returns previous index value
  97. void SwitchRawPtr(T* from, T* to) noexcept;
  98. void WaitReaders() noexcept;
  99. private:
  100. TAtomic RawPtr = 0; // T* // Pointer to current value
  101. static_assert(sizeof(TAtomic) == sizeof(T*), "TAtomic can't represent a pointer value");
  102. TAdaptiveLock UpdateMutex; // Guarantee that AtomicStore() will be one at a time
  103. mutable NHotSwapPrivate::TWriterLock WriterLocks[2]; // Guarantee that AtomicStore() will wait for all concurrent AtomicLoad()'s completion
  104. TAtomic LockIndex = 0;
  105. };
  106. // Atomic operations of AtomicLoad:
  107. // r:1 index = LockIndex
  108. // r:2 WriterLocks[index].ReadersCount++
  109. // r:3 p = RawPtr
  110. // r:4 p->RefCount++
  111. // r:5 WriterLocks[index].ReadersCount--
  112. // Important atomic operations of AtomicStore(newRawPtr):
  113. // w:1 RawPtr = newRawPtr
  114. // w:2 LockIndex = 1
  115. // w:3 WriterLocks[0].Wait()
  116. // w:4 LockIndex = 0
  117. // w:5 WriterLocks[1].Wait()
  118. // w:3 (first wait) is needed for sequences:
  119. // r:1-3, w:1-2, r:4-5, w:3-5 // the most frequent case
  120. // w1:1, r:1, w1:2-5, r:2-3, w2:1-2, r:4-5, w2:3-5
  121. // w:5 (second wait) is needed for sequences:
  122. // w1:1-2, r:1, w1:3-5, r:2-3, w2:1-4, r:4-5, w2:5
  123. // If there was only one wait,
  124. // in this case writer wouldn't wait appropriate reader
  125. // w1, w2 - two different writers
  126. template <class T, class Ops>
  127. void THotSwap<T, Ops>::AtomicStore(T* p) noexcept {
  128. TPtr oldPtr;
  129. with_lock (UpdateMutex) {
  130. oldPtr = GetRawPtr();
  131. SwitchRawPtr(oldPtr.Get(), p);
  132. Y_ASSERT(!oldPtr || oldPtr.RefCount() > 0);
  133. // Wait all AtomicLoad()'s to properly take old pointer value concurrently
  134. WaitReaders();
  135. // Release lock and then kill (maybe) old object
  136. }
  137. }
  138. template <class T, class Ops>
  139. TAtomicBase THotSwap<T, Ops>::SwitchLockIndex() noexcept {
  140. const TAtomicBase prevIndex = AtomicGet(LockIndex);
  141. Y_ASSERT(prevIndex == 0 || prevIndex == 1);
  142. AtomicSet(LockIndex, prevIndex ^ 1);
  143. return prevIndex;
  144. }
  145. template <class T, class Ops>
  146. void THotSwap<T, Ops>::WaitReaders() noexcept {
  147. WriterLocks[SwitchLockIndex()].WaitAllReaders();
  148. WriterLocks[SwitchLockIndex()].WaitAllReaders();
  149. }
  150. template <class T, class Ops>
  151. void THotSwap<T, Ops>::SwitchRawPtr(T* from, T* to) noexcept {
  152. if (to)
  153. Ops::Ref(to); // Ref() for new value
  154. AtomicSet(RawPtr, reinterpret_cast<TAtomicBase>(to));
  155. if (from)
  156. Ops::UnRef(from); // Unref() for old value
  157. }