123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283 |
- #pragma once
- #include <library/cpp/deprecated/atomic/atomic.h>
- #include <util/datetime/base.h>
- #include <util/system/mutex.h>
- #include <util/system/hp_timer.h>
- /* Token bucket.
- * Makes flow of *inflow* units per second in average, with up to *capacity* bursts.
- * Do not use for STRICT flow control.
- */
- /* samples: create and use quoter sending 1000 bytes per second on average,
- with up to 60 seconds quota buildup.
- TBucketQuoter quoter(1000, 60000, NULL, NULL, NULL);
- for (;;) {
- T *msg = get_message();
- quoter.Sleep();
- quoter.Use(msg->GetSize());
- send_message(msg);
- }
- ----------------------------
- TBucketQuoter quoter(1000, 60000, NULL, NULL, NULL);
- for (;;) {
- T *msg = get_message();
- while (! quoter.IsAvail()) {
- // do something else
- }
- quoter.Use(msg->GetSize());
- send_message(msg);
- }
- */
- struct TInstantTimerMs {
- using TTime = TInstant;
- static constexpr ui64 Resolution = 1000ull; // milliseconds
- static TTime Now() {
- return TInstant::Now();
- }
- static ui64 Duration(TTime from, TTime to) {
- return (to - from).MilliSeconds();
- }
- };
- struct THPTimerUs {
- using TTime = NHPTimer::STime;
- static constexpr ui64 Resolution = 1000000ull; // microseconds
- static TTime Now() {
- NHPTimer::STime ret;
- NHPTimer::GetTime(&ret);
- return ret;
- }
- static ui64 Duration(TTime from, TTime to) {
- i64 cycles = to - from;
- if (cycles > 0) {
- return ui64(double(cycles) * double(Resolution) / NHPTimer::GetClockRate());
- } else {
- return 0;
- }
- }
- };
- template <typename StatCounter, typename Lock = TMutex, typename Timer = TInstantTimerMs>
- class TBucketQuoter {
- public:
- using TTime = typename Timer::TTime;
- struct TResult {
- i64 Before;
- i64 After;
- ui64 Seqno;
- };
- /* fixed quota */
- TBucketQuoter(ui64 inflow, ui64 capacity, StatCounter* msgPassed = nullptr,
- StatCounter* bucketUnderflows = nullptr, StatCounter* tokensUsed = nullptr,
- StatCounter* usecWaited = nullptr, bool fill = false, StatCounter* aggregateInflow = nullptr)
- : MsgPassed(msgPassed)
- , BucketUnderflows(bucketUnderflows)
- , TokensUsed(tokensUsed)
- , UsecWaited(usecWaited)
- , AggregateInflow(aggregateInflow)
- , Bucket(fill ? capacity : 0)
- , LastAdd(Timer::Now())
- , InflowTokensPerSecond(&FixedInflow)
- , BucketTokensCapacity(&FixedCapacity)
- , FixedInflow(inflow)
- , FixedCapacity(capacity)
- {
- /* no-op */
- }
- /* adjustable quotas */
- TBucketQuoter(TAtomic* inflow, TAtomic* capacity, StatCounter* msgPassed = nullptr,
- StatCounter* bucketUnderflows = nullptr, StatCounter* tokensUsed = nullptr,
- StatCounter* usecWaited = nullptr, bool fill = false, StatCounter* aggregateInflow = nullptr)
- : MsgPassed(msgPassed)
- , BucketUnderflows(bucketUnderflows)
- , TokensUsed(tokensUsed)
- , UsecWaited(usecWaited)
- , AggregateInflow(aggregateInflow)
- , Bucket(fill ? AtomicGet(*capacity) : 0)
- , LastAdd(Timer::Now())
- , InflowTokensPerSecond(inflow)
- , BucketTokensCapacity(capacity)
- {
- /* no-op */
- }
- bool IsAvail() {
- TGuard<Lock> g(BucketMutex);
- FillBucket();
- if (Bucket < 0) {
- if (BucketUnderflows) {
- (*BucketUnderflows)++;
- }
- }
- return (Bucket >= 0);
- }
- bool IsAvail(TResult& res) {
- TGuard<Lock> g(BucketMutex);
- res.Before = Bucket;
- FillBucket();
- res.After = Bucket;
- res.Seqno = ++Seqno;
- if (Bucket < 0) {
- if (BucketUnderflows) {
- (*BucketUnderflows)++;
- }
- }
- return (Bucket >= 0);
- }
- ui64 GetAvail() {
- TGuard<Lock> g(BucketMutex);
- FillBucket();
- return Max<i64>(0, Bucket);
- }
- ui64 GetAvail(TResult& res) {
- TGuard<Lock> g(BucketMutex);
- res.Before = Bucket;
- FillBucket();
- res.After = Bucket;
- res.Seqno = ++Seqno;
- return Max<i64>(0, Bucket);
- }
- void Use(ui64 tokens, bool sleep = false) {
- TGuard<Lock> g(BucketMutex);
- UseNoLock(tokens, sleep);
- }
- void Use(ui64 tokens, TResult& res, bool sleep = false) {
- TGuard<Lock> g(BucketMutex);
- res.Before = Bucket;
- UseNoLock(tokens, sleep);
- res.After = Bucket;
- res.Seqno = ++Seqno;
- }
- i64 UseAndFill(ui64 tokens) {
- TGuard<Lock> g(BucketMutex);
- FillBucket();
- UseNoLock(tokens);
- return Bucket;
- }
- void Add(ui64 tokens) {
- TGuard<Lock> g(BucketMutex);
- AddNoLock(tokens);
- }
- void Add(ui64 tokens, TResult& res) {
- TGuard<Lock> g(BucketMutex);
- res.Before = Bucket;
- AddNoLock(tokens);
- res.After = Bucket;
- res.Seqno = ++Seqno;
- }
- ui32 GetWaitTime() {
- TGuard<Lock> g(BucketMutex);
- FillBucket();
- if (Bucket >= 0) {
- return 0;
- }
- ui32 usec = (-Bucket * 1000000) / (*InflowTokensPerSecond);
- return usec;
- }
- ui32 GetWaitTime(TResult& res) {
- TGuard<Lock> g(BucketMutex);
- res.Before = Bucket;
- FillBucket();
- res.After = Bucket;
- res.Seqno = ++Seqno;
- if (Bucket >= 0) {
- return 0;
- }
- ui32 usec = (-Bucket * 1000000) / (*InflowTokensPerSecond);
- return usec;
- }
- void Sleep() {
- while (!IsAvail()) {
- ui32 delay = GetWaitTime();
- if (delay != 0) {
- usleep(delay);
- if (UsecWaited) {
- (*UsecWaited) += delay;
- }
- }
- }
- }
- private:
- void FillBucket() {
- TTime now = Timer::Now();
- ui64 elapsed = Timer::Duration(LastAdd, now);
- if (*InflowTokensPerSecond * elapsed >= Timer::Resolution) {
- ui64 inflow = *InflowTokensPerSecond * elapsed / Timer::Resolution;
- if (AggregateInflow) {
- *AggregateInflow += inflow;
- }
- Bucket += inflow;
- if (Bucket > *BucketTokensCapacity) {
- Bucket = *BucketTokensCapacity;
- }
- LastAdd = now;
- }
- }
- void UseNoLock(ui64 tokens, bool sleep = false) {
- if (sleep)
- Sleep();
- Bucket -= tokens;
- if (TokensUsed) {
- (*TokensUsed) += tokens;
- }
- if (MsgPassed) {
- (*MsgPassed)++;
- }
- }
- void AddNoLock(ui64 tokens) {
- Bucket += tokens;
- if (Bucket > *BucketTokensCapacity) {
- Bucket = *BucketTokensCapacity;
- }
- }
- StatCounter* MsgPassed;
- StatCounter* BucketUnderflows;
- StatCounter* TokensUsed;
- StatCounter* UsecWaited;
- StatCounter* AggregateInflow;
- i64 Bucket;
- TTime LastAdd;
- Lock BucketMutex;
- ui64 Seqno = 0;
- TAtomic* InflowTokensPerSecond;
- TAtomic* BucketTokensCapacity;
- TAtomic FixedInflow;
- TAtomic FixedCapacity;
- };
|