sliding_window.h 8.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224
  1. #pragma once
  2. #include <util/datetime/base.h>
  3. #include <util/generic/vector.h>
  4. #include <util/system/guard.h>
  5. #include <util/system/mutex.h>
  6. #include <util/system/types.h>
  7. #include <util/system/yassert.h>
  8. #include <functional>
  9. #include <limits>
  10. namespace NSlidingWindow {
  11. namespace NPrivate {
  12. template <class TValueType_, class TCmp, TValueType_ initialValue> // std::less for max, std::greater for min
  13. struct TMinMaxOperationImpl {
  14. using TValueType = TValueType_;
  15. using TValueVector = TVector<TValueType>;
  16. static constexpr TValueType InitialValue() {
  17. return initialValue;
  18. }
  19. // Updates value in current bucket and returns window value
  20. static TValueType UpdateBucket(TValueType windowValue, TValueVector& buckets, size_t index, TValueType newVal) {
  21. Y_ASSERT(index < buckets.size());
  22. TCmp cmp;
  23. TValueType& curVal = buckets[index];
  24. if (cmp(curVal, newVal)) {
  25. curVal = newVal;
  26. if (cmp(windowValue, newVal)) {
  27. windowValue = newVal;
  28. }
  29. }
  30. return windowValue;
  31. }
  32. static TValueType ClearBuckets(TValueType windowValue, TValueVector& buckets, const size_t firstElemIndex, const size_t bucketsToClear) {
  33. Y_ASSERT(!buckets.empty());
  34. Y_ASSERT(firstElemIndex < buckets.size());
  35. Y_ASSERT(bucketsToClear <= buckets.size());
  36. TCmp cmp;
  37. bool needRecalc = false;
  38. size_t current = firstElemIndex;
  39. const size_t arraySize = buckets.size();
  40. for (size_t i = 0; i < bucketsToClear; ++i) {
  41. TValueType& curVal = buckets[current];
  42. if (!needRecalc && windowValue == curVal) {
  43. needRecalc = true;
  44. }
  45. curVal = InitialValue();
  46. current = (current + 1) % arraySize;
  47. }
  48. if (needRecalc) {
  49. windowValue = InitialValue();
  50. for (size_t i = 0; i < firstElemIndex; ++i) {
  51. const TValueType val = buckets[i];
  52. if (cmp(windowValue, val)) {
  53. windowValue = val;
  54. }
  55. }
  56. for (size_t i = current, size = buckets.size(); i < size; ++i) {
  57. const TValueType val = buckets[i];
  58. if (cmp(windowValue, val)) {
  59. windowValue = val;
  60. }
  61. }
  62. }
  63. return windowValue;
  64. }
  65. };
  66. }
  67. template <class TValueType>
  68. using TMaxOperation = NPrivate::TMinMaxOperationImpl<TValueType, std::less<TValueType>, std::numeric_limits<TValueType>::min()>;
  69. template <class TValueType>
  70. using TMinOperation = NPrivate::TMinMaxOperationImpl<TValueType, std::greater<TValueType>, std::numeric_limits<TValueType>::max()>;
  71. template <class TValueType_>
  72. struct TSumOperation {
  73. using TValueType = TValueType_;
  74. using TValueVector = TVector<TValueType>;
  75. static constexpr TValueType InitialValue() {
  76. return TValueType(); // zero
  77. }
  78. // Updates value in current bucket and returns window value
  79. static TValueType UpdateBucket(TValueType windowValue, TValueVector& buckets, size_t index, TValueType newVal) {
  80. Y_ASSERT(index < buckets.size());
  81. buckets[index] += newVal;
  82. windowValue += newVal;
  83. return windowValue;
  84. }
  85. static TValueType ClearBuckets(TValueType windowValue, TValueVector& buckets, size_t firstElemIndex, size_t bucketsToClear) {
  86. Y_ASSERT(!buckets.empty());
  87. Y_ASSERT(firstElemIndex < buckets.size());
  88. Y_ASSERT(bucketsToClear <= buckets.size());
  89. const size_t arraySize = buckets.size();
  90. for (size_t i = 0; i < bucketsToClear; ++i) {
  91. TValueType& curVal = buckets[firstElemIndex];
  92. windowValue -= curVal;
  93. curVal = InitialValue();
  94. firstElemIndex = (firstElemIndex + 1) % arraySize;
  95. }
  96. return windowValue;
  97. }
  98. };
  99. /////////////////////////////////////////////////////////////////////////////////////////
  100. // TSlidingWindow
  101. /////////////////////////////////////////////////////////////////////////////////////////
  102. template <class TOperation, class TMutexImpl = TFakeMutex>
  103. class TSlidingWindow {
  104. public:
  105. using TValueType = typename TOperation::TValueType;
  106. using TValueVector = TVector<TValueType>;
  107. using TSizeType = typename TValueVector::size_type;
  108. public:
  109. TSlidingWindow(const TDuration& length, TSizeType partsNum)
  110. : Mutex()
  111. , Buckets(partsNum, TOperation::InitialValue()) // vector of size partsNum initialized with initial value
  112. , WindowValue(TOperation::InitialValue())
  113. , FirstElem(0)
  114. , PeriodStart()
  115. , Length(length)
  116. , MicroSecondsPerBucket(length.MicroSeconds() / partsNum)
  117. {
  118. }
  119. TSlidingWindow(const TSlidingWindow& w)
  120. : Mutex()
  121. {
  122. TGuard<TMutexImpl> guard(&w.Mutex);
  123. Buckets = w.Buckets;
  124. WindowValue = w.WindowValue;
  125. FirstElem = w.FirstElem;
  126. PeriodStart = w.PeriodStart;
  127. Length = w.Length;
  128. MicroSecondsPerBucket = w.MicroSecondsPerBucket;
  129. }
  130. TSlidingWindow(TSlidingWindow&&) = default;
  131. TSlidingWindow& operator=(TSlidingWindow&&) = default;
  132. TSlidingWindow& operator=(const TSlidingWindow&) = delete;
  133. // Period of time
  134. const TDuration& GetDuration() const {
  135. return Length;
  136. }
  137. // Update window with new value and time
  138. TValueType Update(TValueType val, TInstant t) {
  139. TGuard<TMutexImpl> guard(&Mutex);
  140. AdvanceTime(t);
  141. UpdateCurrentBucket(val);
  142. return WindowValue;
  143. }
  144. // Update just time, without new values
  145. TValueType Update(TInstant t) {
  146. TGuard<TMutexImpl> guard(&Mutex);
  147. AdvanceTime(t);
  148. return WindowValue;
  149. }
  150. // Get current window value (without updating current time)
  151. TValueType GetValue() const {
  152. TGuard<TMutexImpl> guard(&Mutex);
  153. return WindowValue;
  154. }
  155. private:
  156. void UpdateCurrentBucket(TValueType val) {
  157. const TSizeType arraySize = Buckets.size();
  158. const TSizeType pos = (FirstElem + arraySize - 1) % arraySize;
  159. WindowValue = TOperation::UpdateBucket(WindowValue, Buckets, pos, val);
  160. }
  161. void AdvanceTime(const TInstant& time) {
  162. if (time < PeriodStart + Length) {
  163. return;
  164. }
  165. if (PeriodStart.MicroSeconds() == 0) {
  166. PeriodStart = time - Length;
  167. return;
  168. }
  169. const TInstant& newPeriodStart = time - Length;
  170. const ui64 tmDiff = (newPeriodStart - PeriodStart).MicroSeconds();
  171. const TSizeType bucketsDiff = tmDiff / MicroSecondsPerBucket;
  172. const TSizeType arraySize = Buckets.size();
  173. const TSizeType buckets = Min(bucketsDiff, arraySize);
  174. WindowValue = TOperation::ClearBuckets(WindowValue, Buckets, FirstElem, buckets);
  175. FirstElem = (FirstElem + buckets) % arraySize;
  176. PeriodStart += TDuration::MicroSeconds(bucketsDiff * MicroSecondsPerBucket);
  177. // Check that PeriodStart lags behind newPeriodStart
  178. // (which is actual, uptodate, precise and equal to time - Length) not more
  179. // then MicroSecondsPerBucket
  180. Y_ASSERT(newPeriodStart >= PeriodStart);
  181. Y_ASSERT((newPeriodStart - PeriodStart).MicroSeconds() <= MicroSecondsPerBucket);
  182. }
  183. mutable TMutexImpl Mutex;
  184. TValueVector Buckets;
  185. TValueType WindowValue;
  186. TSizeType FirstElem;
  187. TInstant PeriodStart;
  188. TDuration Length;
  189. ui64 MicroSecondsPerBucket;
  190. };
  191. }