bucket_quoter.h 7.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283
  1. #pragma once
  2. #include <library/cpp/deprecated/atomic/atomic.h>
  3. #include <util/datetime/base.h>
  4. #include <util/system/mutex.h>
  5. #include <util/system/hp_timer.h>
  6. /* Token bucket.
  7. * Makes flow of *inflow* units per second in average, with up to *capacity* bursts.
  8. * Do not use for STRICT flow control.
  9. */
  10. /* samples: create and use quoter sending 1000 bytes per second on average,
  11. with up to 60 seconds quota buildup.
  12. TBucketQuoter quoter(1000, 60000, NULL, NULL, NULL);
  13. for (;;) {
  14. T *msg = get_message();
  15. quoter.Sleep();
  16. quoter.Use(msg->GetSize());
  17. send_message(msg);
  18. }
  19. ----------------------------
  20. TBucketQuoter quoter(1000, 60000, NULL, NULL, NULL);
  21. for (;;) {
  22. T *msg = get_message();
  23. while (! quoter.IsAvail()) {
  24. // do something else
  25. }
  26. quoter.Use(msg->GetSize());
  27. send_message(msg);
  28. }
  29. */
  30. struct TInstantTimerMs {
  31. using TTime = TInstant;
  32. static constexpr ui64 Resolution = 1000ull; // milliseconds
  33. static TTime Now() {
  34. return TInstant::Now();
  35. }
  36. static ui64 Duration(TTime from, TTime to) {
  37. return (to - from).MilliSeconds();
  38. }
  39. };
  40. struct THPTimerUs {
  41. using TTime = NHPTimer::STime;
  42. static constexpr ui64 Resolution = 1000000ull; // microseconds
  43. static TTime Now() {
  44. NHPTimer::STime ret;
  45. NHPTimer::GetTime(&ret);
  46. return ret;
  47. }
  48. static ui64 Duration(TTime from, TTime to) {
  49. i64 cycles = to - from;
  50. if (cycles > 0) {
  51. return ui64(double(cycles) * double(Resolution) / NHPTimer::GetClockRate());
  52. } else {
  53. return 0;
  54. }
  55. }
  56. };
  57. template <typename StatCounter, typename Lock = TMutex, typename Timer = TInstantTimerMs>
  58. class TBucketQuoter {
  59. public:
  60. using TTime = typename Timer::TTime;
  61. struct TResult {
  62. i64 Before;
  63. i64 After;
  64. ui64 Seqno;
  65. };
  66. /* fixed quota */
  67. TBucketQuoter(ui64 inflow, ui64 capacity, StatCounter* msgPassed = nullptr,
  68. StatCounter* bucketUnderflows = nullptr, StatCounter* tokensUsed = nullptr,
  69. StatCounter* usecWaited = nullptr, bool fill = false, StatCounter* aggregateInflow = nullptr)
  70. : MsgPassed(msgPassed)
  71. , BucketUnderflows(bucketUnderflows)
  72. , TokensUsed(tokensUsed)
  73. , UsecWaited(usecWaited)
  74. , AggregateInflow(aggregateInflow)
  75. , Bucket(fill ? capacity : 0)
  76. , LastAdd(Timer::Now())
  77. , InflowTokensPerSecond(&FixedInflow)
  78. , BucketTokensCapacity(&FixedCapacity)
  79. , FixedInflow(inflow)
  80. , FixedCapacity(capacity)
  81. {
  82. /* no-op */
  83. }
  84. /* adjustable quotas */
  85. TBucketQuoter(TAtomic* inflow, TAtomic* capacity, StatCounter* msgPassed = nullptr,
  86. StatCounter* bucketUnderflows = nullptr, StatCounter* tokensUsed = nullptr,
  87. StatCounter* usecWaited = nullptr, bool fill = false, StatCounter* aggregateInflow = nullptr)
  88. : MsgPassed(msgPassed)
  89. , BucketUnderflows(bucketUnderflows)
  90. , TokensUsed(tokensUsed)
  91. , UsecWaited(usecWaited)
  92. , AggregateInflow(aggregateInflow)
  93. , Bucket(fill ? AtomicGet(*capacity) : 0)
  94. , LastAdd(Timer::Now())
  95. , InflowTokensPerSecond(inflow)
  96. , BucketTokensCapacity(capacity)
  97. {
  98. /* no-op */
  99. }
  100. bool IsAvail() {
  101. TGuard<Lock> g(BucketMutex);
  102. FillBucket();
  103. if (Bucket < 0) {
  104. if (BucketUnderflows) {
  105. (*BucketUnderflows)++;
  106. }
  107. }
  108. return (Bucket >= 0);
  109. }
  110. bool IsAvail(TResult& res) {
  111. TGuard<Lock> g(BucketMutex);
  112. res.Before = Bucket;
  113. FillBucket();
  114. res.After = Bucket;
  115. res.Seqno = ++Seqno;
  116. if (Bucket < 0) {
  117. if (BucketUnderflows) {
  118. (*BucketUnderflows)++;
  119. }
  120. }
  121. return (Bucket >= 0);
  122. }
  123. ui64 GetAvail() {
  124. TGuard<Lock> g(BucketMutex);
  125. FillBucket();
  126. return Max<i64>(0, Bucket);
  127. }
  128. ui64 GetAvail(TResult& res) {
  129. TGuard<Lock> g(BucketMutex);
  130. res.Before = Bucket;
  131. FillBucket();
  132. res.After = Bucket;
  133. res.Seqno = ++Seqno;
  134. return Max<i64>(0, Bucket);
  135. }
  136. void Use(ui64 tokens, bool sleep = false) {
  137. TGuard<Lock> g(BucketMutex);
  138. UseNoLock(tokens, sleep);
  139. }
  140. void Use(ui64 tokens, TResult& res, bool sleep = false) {
  141. TGuard<Lock> g(BucketMutex);
  142. res.Before = Bucket;
  143. UseNoLock(tokens, sleep);
  144. res.After = Bucket;
  145. res.Seqno = ++Seqno;
  146. }
  147. i64 UseAndFill(ui64 tokens) {
  148. TGuard<Lock> g(BucketMutex);
  149. UseNoLock(tokens);
  150. FillBucket();
  151. return Bucket;
  152. }
  153. void Add(ui64 tokens) {
  154. TGuard<Lock> g(BucketMutex);
  155. AddNoLock(tokens);
  156. }
  157. void Add(ui64 tokens, TResult& res) {
  158. TGuard<Lock> g(BucketMutex);
  159. res.Before = Bucket;
  160. AddNoLock(tokens);
  161. res.After = Bucket;
  162. res.Seqno = ++Seqno;
  163. }
  164. ui32 GetWaitTime() {
  165. TGuard<Lock> g(BucketMutex);
  166. FillBucket();
  167. if (Bucket >= 0) {
  168. return 0;
  169. }
  170. ui32 usec = (-Bucket * 1000000) / (*InflowTokensPerSecond);
  171. return usec;
  172. }
  173. ui32 GetWaitTime(TResult& res) {
  174. TGuard<Lock> g(BucketMutex);
  175. res.Before = Bucket;
  176. FillBucket();
  177. res.After = Bucket;
  178. res.Seqno = ++Seqno;
  179. if (Bucket >= 0) {
  180. return 0;
  181. }
  182. ui32 usec = (-Bucket * 1000000) / (*InflowTokensPerSecond);
  183. return usec;
  184. }
  185. void Sleep() {
  186. while (!IsAvail()) {
  187. ui32 delay = GetWaitTime();
  188. if (delay != 0) {
  189. usleep(delay);
  190. if (UsecWaited) {
  191. (*UsecWaited) += delay;
  192. }
  193. }
  194. }
  195. }
  196. private:
  197. void FillBucket() {
  198. TTime now = Timer::Now();
  199. ui64 elapsed = Timer::Duration(LastAdd, now);
  200. if (*InflowTokensPerSecond * elapsed >= Timer::Resolution) {
  201. ui64 inflow = *InflowTokensPerSecond * elapsed / Timer::Resolution;
  202. if (AggregateInflow) {
  203. *AggregateInflow += inflow;
  204. }
  205. Bucket += inflow;
  206. if (Bucket > *BucketTokensCapacity) {
  207. Bucket = *BucketTokensCapacity;
  208. }
  209. LastAdd = now;
  210. }
  211. }
  212. void UseNoLock(ui64 tokens, bool sleep = false) {
  213. if (sleep)
  214. Sleep();
  215. Bucket -= tokens;
  216. if (TokensUsed) {
  217. (*TokensUsed) += tokens;
  218. }
  219. if (MsgPassed) {
  220. (*MsgPassed)++;
  221. }
  222. }
  223. void AddNoLock(ui64 tokens) {
  224. Bucket += tokens;
  225. if (Bucket > *BucketTokensCapacity) {
  226. Bucket = *BucketTokensCapacity;
  227. }
  228. }
  229. StatCounter* MsgPassed;
  230. StatCounter* BucketUnderflows;
  231. StatCounter* TokensUsed;
  232. StatCounter* UsecWaited;
  233. StatCounter* AggregateInflow;
  234. i64 Bucket;
  235. TTime LastAdd;
  236. Lock BucketMutex;
  237. ui64 Seqno = 0;
  238. TAtomic* InflowTokensPerSecond;
  239. TAtomic* BucketTokensCapacity;
  240. TAtomic FixedInflow;
  241. TAtomic FixedCapacity;
  242. };