stop_state.h 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237
  1. // -*- C++ -*-
  2. //===----------------------------------------------------------------------===//
  3. //
  4. // Part of the LLVM Project, under the Apache License v2.0 with LLVM Exceptions.
  5. // See https://llvm.org/LICENSE.txt for license information.
  6. // SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception
  7. //
  8. //===----------------------------------------------------------------------===//
  9. #ifndef _LIBCPP___STOP_TOKEN_STOP_STATE_H
  10. #define _LIBCPP___STOP_TOKEN_STOP_STATE_H
  11. #include <__assert>
  12. #include <__availability>
  13. #include <__config>
  14. #include <__stop_token/atomic_unique_lock.h>
  15. #include <__stop_token/intrusive_list_view.h>
  16. #include <__thread/id.h>
  17. #include <atomic>
  18. #include <cstdint>
  19. #if !defined(_LIBCPP_HAS_NO_PRAGMA_SYSTEM_HEADER)
  20. # pragma GCC system_header
  21. #endif
  22. _LIBCPP_BEGIN_NAMESPACE_STD
  23. #if _LIBCPP_STD_VER >= 20 && !defined(_LIBCPP_HAS_NO_THREADS)
  24. struct __stop_callback_base : __intrusive_node_base<__stop_callback_base> {
  25. using __callback_fn_t = void(__stop_callback_base*) noexcept;
  26. _LIBCPP_HIDE_FROM_ABI explicit __stop_callback_base(__callback_fn_t* __callback_fn) : __callback_fn_(__callback_fn) {}
  27. _LIBCPP_HIDE_FROM_ABI void __invoke() noexcept { __callback_fn_(this); }
  28. __callback_fn_t* __callback_fn_;
  29. atomic<bool> __completed_ = false;
  30. bool* __destroyed_ = nullptr;
  31. };
  32. class __stop_state {
  33. static constexpr uint32_t __stop_requested_bit = 1;
  34. static constexpr uint32_t __callback_list_locked_bit = 1 << 1;
  35. static constexpr uint32_t __stop_source_counter_shift = 2;
  36. // The "stop_source counter" is not used for lifetime reference counting.
  37. // When the number of stop_source reaches 0, the remaining stop_tokens's
  38. // stop_possible will return false. We need this counter to track this.
  39. //
  40. // The "callback list locked" bit implements the atomic_unique_lock to
  41. // guard the operations on the callback list
  42. //
  43. // 31 - 2 | 1 | 0 |
  44. // stop_source counter | callback list locked | stop_requested |
  45. atomic<uint32_t> __state_ = 0;
  46. // Reference count for stop_token + stop_callback + stop_source
  47. // When the counter reaches zero, the state is destroyed
  48. // It is used by __intrusive_shared_ptr, but it is stored here for better layout
  49. atomic<uint32_t> __ref_count_ = 0;
  50. using __state_t = uint32_t;
  51. using __callback_list_lock = __atomic_unique_lock<__state_t, __callback_list_locked_bit>;
  52. using __callback_list = __intrusive_list_view<__stop_callback_base>;
  53. __callback_list __callback_list_;
  54. __thread_id __requesting_thread_;
  55. public:
  56. _LIBCPP_HIDE_FROM_ABI __stop_state() noexcept = default;
  57. _LIBCPP_HIDE_FROM_ABI void __increment_stop_source_counter() noexcept {
  58. _LIBCPP_ASSERT_UNCATEGORIZED(
  59. __state_.load(std::memory_order_relaxed) <= static_cast<__state_t>(~(1 << __stop_source_counter_shift)),
  60. "stop_source's counter reaches the maximum. Incrementing the counter will overflow");
  61. __state_.fetch_add(1 << __stop_source_counter_shift, std::memory_order_relaxed);
  62. }
  63. // We are not destroying the object after counter decrements to zero, nor do we have
  64. // operations depend on the ordering of decrementing the counter. relaxed is enough.
  65. _LIBCPP_HIDE_FROM_ABI void __decrement_stop_source_counter() noexcept {
  66. _LIBCPP_ASSERT_UNCATEGORIZED(
  67. __state_.load(std::memory_order_relaxed) >= static_cast<__state_t>(1 << __stop_source_counter_shift),
  68. "stop_source's counter is 0. Decrementing the counter will underflow");
  69. __state_.fetch_sub(1 << __stop_source_counter_shift, std::memory_order_relaxed);
  70. }
  71. _LIBCPP_HIDE_FROM_ABI bool __stop_requested() const noexcept {
  72. // acquire because [thread.stoptoken.intro] A call to request_stop that returns true
  73. // synchronizes with a call to stop_requested on an associated stop_token or stop_source
  74. // object that returns true.
  75. // request_stop's compare_exchange_weak has release which syncs with this acquire
  76. return (__state_.load(std::memory_order_acquire) & __stop_requested_bit) != 0;
  77. }
  78. _LIBCPP_HIDE_FROM_ABI bool __stop_possible_for_stop_token() const noexcept {
  79. // [stoptoken.mem] false if "a stop request was not made and there are no associated stop_source objects"
  80. // Todo: Can this be std::memory_order_relaxed as the standard does not say anything except not to introduce data
  81. // race?
  82. __state_t __curent_state = __state_.load(std::memory_order_acquire);
  83. return ((__curent_state & __stop_requested_bit) != 0) || ((__curent_state >> __stop_source_counter_shift) != 0);
  84. }
  85. _LIBCPP_AVAILABILITY_SYNC _LIBCPP_HIDE_FROM_ABI bool __request_stop() noexcept {
  86. auto __cb_list_lock = __try_lock_for_request_stop();
  87. if (!__cb_list_lock.__owns_lock()) {
  88. return false;
  89. }
  90. __requesting_thread_ = this_thread::get_id();
  91. while (!__callback_list_.__empty()) {
  92. auto __cb = __callback_list_.__pop_front();
  93. // allow other callbacks to be removed while invoking the current callback
  94. __cb_list_lock.__unlock();
  95. bool __destroyed = false;
  96. __cb->__destroyed_ = &__destroyed;
  97. __cb->__invoke();
  98. // __cb's invoke function could potentially delete itself. We need to check before accessing __cb's member
  99. if (!__destroyed) {
  100. // needs to set __destroyed_ pointer to nullptr, otherwise it points to a local variable
  101. // which is to be destroyed at the end of the loop
  102. __cb->__destroyed_ = nullptr;
  103. // [stopcallback.cons] If callback is concurrently executing on another thread, then the return
  104. // from the invocation of callback strongly happens before ([intro.races]) callback is destroyed.
  105. // this release syncs with the acquire in the remove_callback
  106. __cb->__completed_.store(true, std::memory_order_release);
  107. __cb->__completed_.notify_all();
  108. }
  109. __cb_list_lock.__lock();
  110. }
  111. return true;
  112. }
  113. _LIBCPP_AVAILABILITY_SYNC _LIBCPP_HIDE_FROM_ABI bool __add_callback(__stop_callback_base* __cb) noexcept {
  114. // If it is already stop_requested. Do not try to request it again.
  115. const auto __give_up_trying_to_lock_condition = [__cb](__state_t __state) {
  116. if ((__state & __stop_requested_bit) != 0) {
  117. // already stop requested, synchronously run the callback and no need to lock the list again
  118. __cb->__invoke();
  119. return true;
  120. }
  121. // no stop source. no need to lock the list to add the callback as it can never be invoked
  122. return (__state >> __stop_source_counter_shift) == 0;
  123. };
  124. __callback_list_lock __cb_list_lock(__state_, __give_up_trying_to_lock_condition);
  125. if (!__cb_list_lock.__owns_lock()) {
  126. return false;
  127. }
  128. __callback_list_.__push_front(__cb);
  129. return true;
  130. // unlock here: [thread.stoptoken.intro] Registration of a callback synchronizes with the invocation of
  131. // that callback.
  132. // Note: this release sync with the acquire in the request_stop' __try_lock_for_request_stop
  133. }
  134. // called by the destructor of stop_callback
  135. _LIBCPP_AVAILABILITY_SYNC _LIBCPP_HIDE_FROM_ABI void __remove_callback(__stop_callback_base* __cb) noexcept {
  136. __callback_list_lock __cb_list_lock(__state_);
  137. // under below condition, the request_stop call just popped __cb from the list and could execute it now
  138. bool __potentially_executing_now = __cb->__prev_ == nullptr && !__callback_list_.__is_head(__cb);
  139. if (__potentially_executing_now) {
  140. auto __requested_thread = __requesting_thread_;
  141. __cb_list_lock.__unlock();
  142. if (std::this_thread::get_id() != __requested_thread) {
  143. // [stopcallback.cons] If callback is concurrently executing on another thread, then the return
  144. // from the invocation of callback strongly happens before ([intro.races]) callback is destroyed.
  145. __cb->__completed_.wait(false, std::memory_order_acquire);
  146. } else {
  147. // The destructor of stop_callback runs on the same thread of the thread that invokes the callback.
  148. // The callback is potentially invoking its own destuctor. Set the flag to avoid accessing destroyed
  149. // members on the invoking side
  150. if (__cb->__destroyed_) {
  151. *__cb->__destroyed_ = true;
  152. }
  153. }
  154. } else {
  155. __callback_list_.__remove(__cb);
  156. }
  157. }
  158. private:
  159. _LIBCPP_AVAILABILITY_SYNC _LIBCPP_HIDE_FROM_ABI __callback_list_lock __try_lock_for_request_stop() noexcept {
  160. // If it is already stop_requested, do not try to request stop or lock the list again.
  161. const auto __lock_fail_condition = [](__state_t __state) { return (__state & __stop_requested_bit) != 0; };
  162. // set locked and requested bit at the same time
  163. const auto __after_lock_state = [](__state_t __state) {
  164. return __state | __callback_list_locked_bit | __stop_requested_bit;
  165. };
  166. // acq because [thread.stoptoken.intro] Registration of a callback synchronizes with the invocation of that
  167. // callback. We are going to invoke the callback after getting the lock, acquire so that we can see the
  168. // registration of a callback (and other writes that happens-before the add_callback)
  169. // Note: the rel (unlock) in the add_callback syncs with this acq
  170. // rel because [thread.stoptoken.intro] A call to request_stop that returns true synchronizes with a call
  171. // to stop_requested on an associated stop_token or stop_source object that returns true.
  172. // We need to make sure that all writes (including user code) before request_stop will be made visible
  173. // to the threads that waiting for `stop_requested == true`
  174. // Note: this rel syncs with the acq in `stop_requested`
  175. const auto __locked_ordering = std::memory_order_acq_rel;
  176. return __callback_list_lock(__state_, __lock_fail_condition, __after_lock_state, __locked_ordering);
  177. }
  178. template <class _Tp>
  179. friend struct __intrusive_shared_ptr_traits;
  180. };
  181. template <class _Tp>
  182. struct __intrusive_shared_ptr_traits;
  183. template <>
  184. struct __intrusive_shared_ptr_traits<__stop_state> {
  185. _LIBCPP_HIDE_FROM_ABI static atomic<uint32_t>& __get_atomic_ref_count(__stop_state& __state) {
  186. return __state.__ref_count_;
  187. }
  188. };
  189. #endif // _LIBCPP_STD_VER >= 20 && !defined(_LIBCPP_HAS_NO_THREADS)
  190. _LIBCPP_END_NAMESPACE_STD
  191. #endif // _LIBCPP___STOP_TOKEN_STOP_STATE_H