123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089 |
- /*
- * Copyright (c) 2007-2012 Niels Provos and Nick Mathewson
- * Copyright (c) 2002-2006 Niels Provos <provos@citi.umich.edu>
- * All rights reserved.
- *
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions
- * are met:
- * 1. Redistributions of source code must retain the above copyright
- * notice, this list of conditions and the following disclaimer.
- * 2. Redistributions in binary form must reproduce the above copyright
- * notice, this list of conditions and the following disclaimer in the
- * documentation and/or other materials provided with the distribution.
- * 3. The name of the author may not be used to endorse or promote products
- * derived from this software without specific prior written permission.
- *
- * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR
- * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
- * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
- * IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT,
- * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
- * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
- * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
- * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
- * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF
- * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
- */
- #include "evconfig-private.h"
- #include <sys/types.h>
- #include <limits.h>
- #include <string.h>
- #include <stdlib.h>
- #include "event2/event.h"
- #include "event2/event_struct.h"
- #include "event2/util.h"
- #include "event2/bufferevent.h"
- #include "event2/bufferevent_struct.h"
- #include "event2/buffer.h"
- #include "ratelim-internal.h"
- #include "bufferevent-internal.h"
- #include "mm-internal.h"
- #include "util-internal.h"
- #include "event-internal.h"
- int
- ev_token_bucket_init_(struct ev_token_bucket *bucket,
- const struct ev_token_bucket_cfg *cfg,
- ev_uint32_t current_tick,
- int reinitialize)
- {
- if (reinitialize) {
- /* on reinitialization, we only clip downwards, since we've
- already used who-knows-how-much bandwidth this tick. We
- leave "last_updated" as it is; the next update will add the
- appropriate amount of bandwidth to the bucket.
- */
- if (bucket->read_limit > (ev_int64_t) cfg->read_maximum)
- bucket->read_limit = cfg->read_maximum;
- if (bucket->write_limit > (ev_int64_t) cfg->write_maximum)
- bucket->write_limit = cfg->write_maximum;
- } else {
- bucket->read_limit = cfg->read_rate;
- bucket->write_limit = cfg->write_rate;
- bucket->last_updated = current_tick;
- }
- return 0;
- }
- int
- ev_token_bucket_update_(struct ev_token_bucket *bucket,
- const struct ev_token_bucket_cfg *cfg,
- ev_uint32_t current_tick)
- {
- /* It's okay if the tick number overflows, since we'll just
- * wrap around when we do the unsigned substraction. */
- unsigned n_ticks = current_tick - bucket->last_updated;
- /* Make sure some ticks actually happened, and that time didn't
- * roll back. */
- if (n_ticks == 0 || n_ticks > INT_MAX)
- return 0;
- /* Naively, we would say
- bucket->limit += n_ticks * cfg->rate;
- if (bucket->limit > cfg->maximum)
- bucket->limit = cfg->maximum;
- But we're worried about overflow, so we do it like this:
- */
- if ((cfg->read_maximum - bucket->read_limit) / n_ticks < cfg->read_rate)
- bucket->read_limit = cfg->read_maximum;
- else
- bucket->read_limit += n_ticks * cfg->read_rate;
- if ((cfg->write_maximum - bucket->write_limit) / n_ticks < cfg->write_rate)
- bucket->write_limit = cfg->write_maximum;
- else
- bucket->write_limit += n_ticks * cfg->write_rate;
- bucket->last_updated = current_tick;
- return 1;
- }
- static inline void
- bufferevent_update_buckets(struct bufferevent_private *bev)
- {
- /* Must hold lock on bev. */
- struct timeval now;
- unsigned tick;
- event_base_gettimeofday_cached(bev->bev.ev_base, &now);
- tick = ev_token_bucket_get_tick_(&now, bev->rate_limiting->cfg);
- if (tick != bev->rate_limiting->limit.last_updated)
- ev_token_bucket_update_(&bev->rate_limiting->limit,
- bev->rate_limiting->cfg, tick);
- }
- ev_uint32_t
- ev_token_bucket_get_tick_(const struct timeval *tv,
- const struct ev_token_bucket_cfg *cfg)
- {
- /* This computation uses two multiplies and a divide. We could do
- * fewer if we knew that the tick length was an integer number of
- * seconds, or if we knew it divided evenly into a second. We should
- * investigate that more.
- */
- /* We cast to an ev_uint64_t first, since we don't want to overflow
- * before we do the final divide. */
- ev_uint64_t msec = (ev_uint64_t)tv->tv_sec * 1000 + tv->tv_usec / 1000;
- return (unsigned)(msec / cfg->msec_per_tick);
- }
- struct ev_token_bucket_cfg *
- ev_token_bucket_cfg_new(size_t read_rate, size_t read_burst,
- size_t write_rate, size_t write_burst,
- const struct timeval *tick_len)
- {
- struct ev_token_bucket_cfg *r;
- struct timeval g;
- if (! tick_len) {
- g.tv_sec = 1;
- g.tv_usec = 0;
- tick_len = &g;
- }
- if (read_rate > read_burst || write_rate > write_burst ||
- read_rate < 1 || write_rate < 1)
- return NULL;
- if (read_rate > EV_RATE_LIMIT_MAX ||
- write_rate > EV_RATE_LIMIT_MAX ||
- read_burst > EV_RATE_LIMIT_MAX ||
- write_burst > EV_RATE_LIMIT_MAX)
- return NULL;
- r = mm_calloc(1, sizeof(struct ev_token_bucket_cfg));
- if (!r)
- return NULL;
- r->read_rate = read_rate;
- r->write_rate = write_rate;
- r->read_maximum = read_burst;
- r->write_maximum = write_burst;
- memcpy(&r->tick_timeout, tick_len, sizeof(struct timeval));
- r->msec_per_tick = (tick_len->tv_sec * 1000) +
- (tick_len->tv_usec & COMMON_TIMEOUT_MICROSECONDS_MASK)/1000;
- return r;
- }
- void
- ev_token_bucket_cfg_free(struct ev_token_bucket_cfg *cfg)
- {
- mm_free(cfg);
- }
- /* Default values for max_single_read & max_single_write variables. */
- #define MAX_SINGLE_READ_DEFAULT 16384
- #define MAX_SINGLE_WRITE_DEFAULT 16384
- #define LOCK_GROUP(g) EVLOCK_LOCK((g)->lock, 0)
- #define UNLOCK_GROUP(g) EVLOCK_UNLOCK((g)->lock, 0)
- static int bev_group_suspend_reading_(struct bufferevent_rate_limit_group *g);
- static int bev_group_suspend_writing_(struct bufferevent_rate_limit_group *g);
- static void bev_group_unsuspend_reading_(struct bufferevent_rate_limit_group *g);
- static void bev_group_unsuspend_writing_(struct bufferevent_rate_limit_group *g);
- /** Helper: figure out the maximum amount we should write if is_write, or
- the maximum amount we should read if is_read. Return that maximum, or
- 0 if our bucket is wholly exhausted.
- */
- static inline ev_ssize_t
- bufferevent_get_rlim_max_(struct bufferevent_private *bev, int is_write)
- {
- /* needs lock on bev. */
- ev_ssize_t max_so_far = is_write?bev->max_single_write:bev->max_single_read;
- #define LIM(x) \
- (is_write ? (x).write_limit : (x).read_limit)
- #define GROUP_SUSPENDED(g) \
- (is_write ? (g)->write_suspended : (g)->read_suspended)
- /* Sets max_so_far to MIN(x, max_so_far) */
- #define CLAMPTO(x) \
- do { \
- if (max_so_far > (x)) \
- max_so_far = (x); \
- } while (0);
- if (!bev->rate_limiting)
- return max_so_far;
- /* If rate-limiting is enabled at all, update the appropriate
- bucket, and take the smaller of our rate limit and the group
- rate limit.
- */
- if (bev->rate_limiting->cfg) {
- bufferevent_update_buckets(bev);
- max_so_far = LIM(bev->rate_limiting->limit);
- }
- if (bev->rate_limiting->group) {
- struct bufferevent_rate_limit_group *g =
- bev->rate_limiting->group;
- ev_ssize_t share;
- LOCK_GROUP(g);
- if (GROUP_SUSPENDED(g)) {
- /* We can get here if we failed to lock this
- * particular bufferevent while suspending the whole
- * group. */
- if (is_write)
- bufferevent_suspend_write_(&bev->bev,
- BEV_SUSPEND_BW_GROUP);
- else
- bufferevent_suspend_read_(&bev->bev,
- BEV_SUSPEND_BW_GROUP);
- share = 0;
- } else {
- /* XXXX probably we should divide among the active
- * members, not the total members. */
- share = LIM(g->rate_limit) / g->n_members;
- if (share < g->min_share)
- share = g->min_share;
- }
- UNLOCK_GROUP(g);
- CLAMPTO(share);
- }
- if (max_so_far < 0)
- max_so_far = 0;
- return max_so_far;
- }
- ev_ssize_t
- bufferevent_get_read_max_(struct bufferevent_private *bev)
- {
- return bufferevent_get_rlim_max_(bev, 0);
- }
- ev_ssize_t
- bufferevent_get_write_max_(struct bufferevent_private *bev)
- {
- return bufferevent_get_rlim_max_(bev, 1);
- }
- int
- bufferevent_decrement_read_buckets_(struct bufferevent_private *bev, ev_ssize_t bytes)
- {
- /* XXXXX Make sure all users of this function check its return value */
- int r = 0;
- /* need to hold lock on bev */
- if (!bev->rate_limiting)
- return 0;
- if (bev->rate_limiting->cfg) {
- bev->rate_limiting->limit.read_limit -= bytes;
- if (bev->rate_limiting->limit.read_limit <= 0) {
- bufferevent_suspend_read_(&bev->bev, BEV_SUSPEND_BW);
- if (event_add(&bev->rate_limiting->refill_bucket_event,
- &bev->rate_limiting->cfg->tick_timeout) < 0)
- r = -1;
- } else if (bev->read_suspended & BEV_SUSPEND_BW) {
- if (!(bev->write_suspended & BEV_SUSPEND_BW))
- event_del(&bev->rate_limiting->refill_bucket_event);
- bufferevent_unsuspend_read_(&bev->bev, BEV_SUSPEND_BW);
- }
- }
- if (bev->rate_limiting->group) {
- LOCK_GROUP(bev->rate_limiting->group);
- bev->rate_limiting->group->rate_limit.read_limit -= bytes;
- bev->rate_limiting->group->total_read += bytes;
- if (bev->rate_limiting->group->rate_limit.read_limit <= 0) {
- bev_group_suspend_reading_(bev->rate_limiting->group);
- } else if (bev->rate_limiting->group->read_suspended) {
- bev_group_unsuspend_reading_(bev->rate_limiting->group);
- }
- UNLOCK_GROUP(bev->rate_limiting->group);
- }
- return r;
- }
- int
- bufferevent_decrement_write_buckets_(struct bufferevent_private *bev, ev_ssize_t bytes)
- {
- /* XXXXX Make sure all users of this function check its return value */
- int r = 0;
- /* need to hold lock */
- if (!bev->rate_limiting)
- return 0;
- if (bev->rate_limiting->cfg) {
- bev->rate_limiting->limit.write_limit -= bytes;
- if (bev->rate_limiting->limit.write_limit <= 0) {
- bufferevent_suspend_write_(&bev->bev, BEV_SUSPEND_BW);
- if (event_add(&bev->rate_limiting->refill_bucket_event,
- &bev->rate_limiting->cfg->tick_timeout) < 0)
- r = -1;
- } else if (bev->write_suspended & BEV_SUSPEND_BW) {
- if (!(bev->read_suspended & BEV_SUSPEND_BW))
- event_del(&bev->rate_limiting->refill_bucket_event);
- bufferevent_unsuspend_write_(&bev->bev, BEV_SUSPEND_BW);
- }
- }
- if (bev->rate_limiting->group) {
- LOCK_GROUP(bev->rate_limiting->group);
- bev->rate_limiting->group->rate_limit.write_limit -= bytes;
- bev->rate_limiting->group->total_written += bytes;
- if (bev->rate_limiting->group->rate_limit.write_limit <= 0) {
- bev_group_suspend_writing_(bev->rate_limiting->group);
- } else if (bev->rate_limiting->group->write_suspended) {
- bev_group_unsuspend_writing_(bev->rate_limiting->group);
- }
- UNLOCK_GROUP(bev->rate_limiting->group);
- }
- return r;
- }
- /** Stop reading on every bufferevent in <b>g</b> */
- static int
- bev_group_suspend_reading_(struct bufferevent_rate_limit_group *g)
- {
- /* Needs group lock */
- struct bufferevent_private *bev;
- g->read_suspended = 1;
- g->pending_unsuspend_read = 0;
- /* Note that in this loop we call EVLOCK_TRY_LOCK_ instead of BEV_LOCK,
- to prevent a deadlock. (Ordinarily, the group lock nests inside
- the bufferevent locks. If we are unable to lock any individual
- bufferevent, it will find out later when it looks at its limit
- and sees that its group is suspended.)
- */
- LIST_FOREACH(bev, &g->members, rate_limiting->next_in_group) {
- if (EVLOCK_TRY_LOCK_(bev->lock)) {
- bufferevent_suspend_read_(&bev->bev,
- BEV_SUSPEND_BW_GROUP);
- EVLOCK_UNLOCK(bev->lock, 0);
- }
- }
- return 0;
- }
- /** Stop writing on every bufferevent in <b>g</b> */
- static int
- bev_group_suspend_writing_(struct bufferevent_rate_limit_group *g)
- {
- /* Needs group lock */
- struct bufferevent_private *bev;
- g->write_suspended = 1;
- g->pending_unsuspend_write = 0;
- LIST_FOREACH(bev, &g->members, rate_limiting->next_in_group) {
- if (EVLOCK_TRY_LOCK_(bev->lock)) {
- bufferevent_suspend_write_(&bev->bev,
- BEV_SUSPEND_BW_GROUP);
- EVLOCK_UNLOCK(bev->lock, 0);
- }
- }
- return 0;
- }
- /** Timer callback invoked on a single bufferevent with one or more exhausted
- buckets when they are ready to refill. */
- static void
- bev_refill_callback_(evutil_socket_t fd, short what, void *arg)
- {
- unsigned tick;
- struct timeval now;
- struct bufferevent_private *bev = arg;
- int again = 0;
- BEV_LOCK(&bev->bev);
- if (!bev->rate_limiting || !bev->rate_limiting->cfg) {
- BEV_UNLOCK(&bev->bev);
- return;
- }
- /* First, update the bucket */
- event_base_gettimeofday_cached(bev->bev.ev_base, &now);
- tick = ev_token_bucket_get_tick_(&now,
- bev->rate_limiting->cfg);
- ev_token_bucket_update_(&bev->rate_limiting->limit,
- bev->rate_limiting->cfg,
- tick);
- /* Now unsuspend any read/write operations as appropriate. */
- if ((bev->read_suspended & BEV_SUSPEND_BW)) {
- if (bev->rate_limiting->limit.read_limit > 0)
- bufferevent_unsuspend_read_(&bev->bev, BEV_SUSPEND_BW);
- else
- again = 1;
- }
- if ((bev->write_suspended & BEV_SUSPEND_BW)) {
- if (bev->rate_limiting->limit.write_limit > 0)
- bufferevent_unsuspend_write_(&bev->bev, BEV_SUSPEND_BW);
- else
- again = 1;
- }
- if (again) {
- /* One or more of the buckets may need another refill if they
- started negative.
- XXXX if we need to be quiet for more ticks, we should
- maybe figure out what timeout we really want.
- */
- /* XXXX Handle event_add failure somehow */
- event_add(&bev->rate_limiting->refill_bucket_event,
- &bev->rate_limiting->cfg->tick_timeout);
- }
- BEV_UNLOCK(&bev->bev);
- }
- /** Helper: grab a random element from a bufferevent group.
- *
- * Requires that we hold the lock on the group.
- */
- static struct bufferevent_private *
- bev_group_random_element_(struct bufferevent_rate_limit_group *group)
- {
- int which;
- struct bufferevent_private *bev;
- /* requires group lock */
- if (!group->n_members)
- return NULL;
- EVUTIL_ASSERT(! LIST_EMPTY(&group->members));
- which = evutil_weakrand_range_(&group->weakrand_seed, group->n_members);
- bev = LIST_FIRST(&group->members);
- while (which--)
- bev = LIST_NEXT(bev, rate_limiting->next_in_group);
- return bev;
- }
- /** Iterate over the elements of a rate-limiting group 'g' with a random
- starting point, assigning each to the variable 'bev', and executing the
- block 'block'.
- We do this in a half-baked effort to get fairness among group members.
- XXX Round-robin or some kind of priority queue would be even more fair.
- */
- #define FOREACH_RANDOM_ORDER(block) \
- do { \
- first = bev_group_random_element_(g); \
- for (bev = first; bev != LIST_END(&g->members); \
- bev = LIST_NEXT(bev, rate_limiting->next_in_group)) { \
- block ; \
- } \
- for (bev = LIST_FIRST(&g->members); bev && bev != first; \
- bev = LIST_NEXT(bev, rate_limiting->next_in_group)) { \
- block ; \
- } \
- } while (0)
- static void
- bev_group_unsuspend_reading_(struct bufferevent_rate_limit_group *g)
- {
- int again = 0;
- struct bufferevent_private *bev, *first;
- g->read_suspended = 0;
- FOREACH_RANDOM_ORDER({
- if (EVLOCK_TRY_LOCK_(bev->lock)) {
- bufferevent_unsuspend_read_(&bev->bev,
- BEV_SUSPEND_BW_GROUP);
- EVLOCK_UNLOCK(bev->lock, 0);
- } else {
- again = 1;
- }
- });
- g->pending_unsuspend_read = again;
- }
- static void
- bev_group_unsuspend_writing_(struct bufferevent_rate_limit_group *g)
- {
- int again = 0;
- struct bufferevent_private *bev, *first;
- g->write_suspended = 0;
- FOREACH_RANDOM_ORDER({
- if (EVLOCK_TRY_LOCK_(bev->lock)) {
- bufferevent_unsuspend_write_(&bev->bev,
- BEV_SUSPEND_BW_GROUP);
- EVLOCK_UNLOCK(bev->lock, 0);
- } else {
- again = 1;
- }
- });
- g->pending_unsuspend_write = again;
- }
- /** Callback invoked every tick to add more elements to the group bucket
- and unsuspend group members as needed.
- */
- static void
- bev_group_refill_callback_(evutil_socket_t fd, short what, void *arg)
- {
- struct bufferevent_rate_limit_group *g = arg;
- unsigned tick;
- struct timeval now;
- event_base_gettimeofday_cached(event_get_base(&g->master_refill_event), &now);
- LOCK_GROUP(g);
- tick = ev_token_bucket_get_tick_(&now, &g->rate_limit_cfg);
- ev_token_bucket_update_(&g->rate_limit, &g->rate_limit_cfg, tick);
- if (g->pending_unsuspend_read ||
- (g->read_suspended && (g->rate_limit.read_limit >= g->min_share))) {
- bev_group_unsuspend_reading_(g);
- }
- if (g->pending_unsuspend_write ||
- (g->write_suspended && (g->rate_limit.write_limit >= g->min_share))){
- bev_group_unsuspend_writing_(g);
- }
- /* XXXX Rather than waiting to the next tick to unsuspend stuff
- * with pending_unsuspend_write/read, we should do it on the
- * next iteration of the mainloop.
- */
- UNLOCK_GROUP(g);
- }
- int
- bufferevent_set_rate_limit(struct bufferevent *bev,
- struct ev_token_bucket_cfg *cfg)
- {
- struct bufferevent_private *bevp = BEV_UPCAST(bev);
- int r = -1;
- struct bufferevent_rate_limit *rlim;
- struct timeval now;
- ev_uint32_t tick;
- int reinit = 0, suspended = 0;
- /* XXX reference-count cfg */
- BEV_LOCK(bev);
- if (cfg == NULL) {
- if (bevp->rate_limiting) {
- rlim = bevp->rate_limiting;
- rlim->cfg = NULL;
- bufferevent_unsuspend_read_(bev, BEV_SUSPEND_BW);
- bufferevent_unsuspend_write_(bev, BEV_SUSPEND_BW);
- if (event_initialized(&rlim->refill_bucket_event))
- event_del(&rlim->refill_bucket_event);
- }
- r = 0;
- goto done;
- }
- event_base_gettimeofday_cached(bev->ev_base, &now);
- tick = ev_token_bucket_get_tick_(&now, cfg);
- if (bevp->rate_limiting && bevp->rate_limiting->cfg == cfg) {
- /* no-op */
- r = 0;
- goto done;
- }
- if (bevp->rate_limiting == NULL) {
- rlim = mm_calloc(1, sizeof(struct bufferevent_rate_limit));
- if (!rlim)
- goto done;
- bevp->rate_limiting = rlim;
- } else {
- rlim = bevp->rate_limiting;
- }
- reinit = rlim->cfg != NULL;
- rlim->cfg = cfg;
- ev_token_bucket_init_(&rlim->limit, cfg, tick, reinit);
- if (reinit) {
- EVUTIL_ASSERT(event_initialized(&rlim->refill_bucket_event));
- event_del(&rlim->refill_bucket_event);
- }
- event_assign(&rlim->refill_bucket_event, bev->ev_base,
- -1, EV_FINALIZE, bev_refill_callback_, bevp);
- if (rlim->limit.read_limit > 0) {
- bufferevent_unsuspend_read_(bev, BEV_SUSPEND_BW);
- } else {
- bufferevent_suspend_read_(bev, BEV_SUSPEND_BW);
- suspended=1;
- }
- if (rlim->limit.write_limit > 0) {
- bufferevent_unsuspend_write_(bev, BEV_SUSPEND_BW);
- } else {
- bufferevent_suspend_write_(bev, BEV_SUSPEND_BW);
- suspended = 1;
- }
- if (suspended)
- event_add(&rlim->refill_bucket_event, &cfg->tick_timeout);
- r = 0;
- done:
- BEV_UNLOCK(bev);
- return r;
- }
- struct bufferevent_rate_limit_group *
- bufferevent_rate_limit_group_new(struct event_base *base,
- const struct ev_token_bucket_cfg *cfg)
- {
- struct bufferevent_rate_limit_group *g;
- struct timeval now;
- ev_uint32_t tick;
- event_base_gettimeofday_cached(base, &now);
- tick = ev_token_bucket_get_tick_(&now, cfg);
- g = mm_calloc(1, sizeof(struct bufferevent_rate_limit_group));
- if (!g)
- return NULL;
- memcpy(&g->rate_limit_cfg, cfg, sizeof(g->rate_limit_cfg));
- LIST_INIT(&g->members);
- ev_token_bucket_init_(&g->rate_limit, cfg, tick, 0);
- event_assign(&g->master_refill_event, base, -1, EV_PERSIST|EV_FINALIZE,
- bev_group_refill_callback_, g);
- /*XXXX handle event_add failure */
- event_add(&g->master_refill_event, &cfg->tick_timeout);
- EVTHREAD_ALLOC_LOCK(g->lock, EVTHREAD_LOCKTYPE_RECURSIVE);
- bufferevent_rate_limit_group_set_min_share(g, 64);
- evutil_weakrand_seed_(&g->weakrand_seed,
- (ev_uint32_t) ((now.tv_sec + now.tv_usec) + (ev_intptr_t)g));
- return g;
- }
- int
- bufferevent_rate_limit_group_set_cfg(
- struct bufferevent_rate_limit_group *g,
- const struct ev_token_bucket_cfg *cfg)
- {
- int same_tick;
- if (!g || !cfg)
- return -1;
- LOCK_GROUP(g);
- same_tick = evutil_timercmp(
- &g->rate_limit_cfg.tick_timeout, &cfg->tick_timeout, ==);
- memcpy(&g->rate_limit_cfg, cfg, sizeof(g->rate_limit_cfg));
- if (g->rate_limit.read_limit > (ev_ssize_t)cfg->read_maximum)
- g->rate_limit.read_limit = cfg->read_maximum;
- if (g->rate_limit.write_limit > (ev_ssize_t)cfg->write_maximum)
- g->rate_limit.write_limit = cfg->write_maximum;
- if (!same_tick) {
- /* This can cause a hiccup in the schedule */
- event_add(&g->master_refill_event, &cfg->tick_timeout);
- }
- /* The new limits might force us to adjust min_share differently. */
- bufferevent_rate_limit_group_set_min_share(g, g->configured_min_share);
- UNLOCK_GROUP(g);
- return 0;
- }
- int
- bufferevent_rate_limit_group_set_min_share(
- struct bufferevent_rate_limit_group *g,
- size_t share)
- {
- if (share > EV_SSIZE_MAX)
- return -1;
- g->configured_min_share = share;
- /* Can't set share to less than the one-tick maximum. IOW, at steady
- * state, at least one connection can go per tick. */
- if (share > g->rate_limit_cfg.read_rate)
- share = g->rate_limit_cfg.read_rate;
- if (share > g->rate_limit_cfg.write_rate)
- share = g->rate_limit_cfg.write_rate;
- g->min_share = share;
- return 0;
- }
- void
- bufferevent_rate_limit_group_free(struct bufferevent_rate_limit_group *g)
- {
- LOCK_GROUP(g);
- EVUTIL_ASSERT(0 == g->n_members);
- event_del(&g->master_refill_event);
- UNLOCK_GROUP(g);
- EVTHREAD_FREE_LOCK(g->lock, EVTHREAD_LOCKTYPE_RECURSIVE);
- mm_free(g);
- }
- int
- bufferevent_add_to_rate_limit_group(struct bufferevent *bev,
- struct bufferevent_rate_limit_group *g)
- {
- int wsuspend, rsuspend;
- struct bufferevent_private *bevp = BEV_UPCAST(bev);
- BEV_LOCK(bev);
- if (!bevp->rate_limiting) {
- struct bufferevent_rate_limit *rlim;
- rlim = mm_calloc(1, sizeof(struct bufferevent_rate_limit));
- if (!rlim) {
- BEV_UNLOCK(bev);
- return -1;
- }
- event_assign(&rlim->refill_bucket_event, bev->ev_base,
- -1, EV_FINALIZE, bev_refill_callback_, bevp);
- bevp->rate_limiting = rlim;
- }
- if (bevp->rate_limiting->group == g) {
- BEV_UNLOCK(bev);
- return 0;
- }
- if (bevp->rate_limiting->group)
- bufferevent_remove_from_rate_limit_group(bev);
- LOCK_GROUP(g);
- bevp->rate_limiting->group = g;
- ++g->n_members;
- LIST_INSERT_HEAD(&g->members, bevp, rate_limiting->next_in_group);
- rsuspend = g->read_suspended;
- wsuspend = g->write_suspended;
- UNLOCK_GROUP(g);
- if (rsuspend)
- bufferevent_suspend_read_(bev, BEV_SUSPEND_BW_GROUP);
- if (wsuspend)
- bufferevent_suspend_write_(bev, BEV_SUSPEND_BW_GROUP);
- BEV_UNLOCK(bev);
- return 0;
- }
- int
- bufferevent_remove_from_rate_limit_group(struct bufferevent *bev)
- {
- return bufferevent_remove_from_rate_limit_group_internal_(bev, 1);
- }
- int
- bufferevent_remove_from_rate_limit_group_internal_(struct bufferevent *bev,
- int unsuspend)
- {
- struct bufferevent_private *bevp = BEV_UPCAST(bev);
- BEV_LOCK(bev);
- if (bevp->rate_limiting && bevp->rate_limiting->group) {
- struct bufferevent_rate_limit_group *g =
- bevp->rate_limiting->group;
- LOCK_GROUP(g);
- bevp->rate_limiting->group = NULL;
- --g->n_members;
- LIST_REMOVE(bevp, rate_limiting->next_in_group);
- UNLOCK_GROUP(g);
- }
- if (unsuspend) {
- bufferevent_unsuspend_read_(bev, BEV_SUSPEND_BW_GROUP);
- bufferevent_unsuspend_write_(bev, BEV_SUSPEND_BW_GROUP);
- }
- BEV_UNLOCK(bev);
- return 0;
- }
- /* ===
- * API functions to expose rate limits.
- *
- * Don't use these from inside Libevent; they're meant to be for use by
- * the program.
- * === */
- /* Mostly you don't want to use this function from inside libevent;
- * bufferevent_get_read_max_() is more likely what you want*/
- ev_ssize_t
- bufferevent_get_read_limit(struct bufferevent *bev)
- {
- ev_ssize_t r;
- struct bufferevent_private *bevp;
- BEV_LOCK(bev);
- bevp = BEV_UPCAST(bev);
- if (bevp->rate_limiting && bevp->rate_limiting->cfg) {
- bufferevent_update_buckets(bevp);
- r = bevp->rate_limiting->limit.read_limit;
- } else {
- r = EV_SSIZE_MAX;
- }
- BEV_UNLOCK(bev);
- return r;
- }
- /* Mostly you don't want to use this function from inside libevent;
- * bufferevent_get_write_max_() is more likely what you want*/
- ev_ssize_t
- bufferevent_get_write_limit(struct bufferevent *bev)
- {
- ev_ssize_t r;
- struct bufferevent_private *bevp;
- BEV_LOCK(bev);
- bevp = BEV_UPCAST(bev);
- if (bevp->rate_limiting && bevp->rate_limiting->cfg) {
- bufferevent_update_buckets(bevp);
- r = bevp->rate_limiting->limit.write_limit;
- } else {
- r = EV_SSIZE_MAX;
- }
- BEV_UNLOCK(bev);
- return r;
- }
- int
- bufferevent_set_max_single_read(struct bufferevent *bev, size_t size)
- {
- struct bufferevent_private *bevp;
- BEV_LOCK(bev);
- bevp = BEV_UPCAST(bev);
- if (size == 0 || size > EV_SSIZE_MAX)
- bevp->max_single_read = MAX_SINGLE_READ_DEFAULT;
- else
- bevp->max_single_read = size;
- BEV_UNLOCK(bev);
- return 0;
- }
- int
- bufferevent_set_max_single_write(struct bufferevent *bev, size_t size)
- {
- struct bufferevent_private *bevp;
- BEV_LOCK(bev);
- bevp = BEV_UPCAST(bev);
- if (size == 0 || size > EV_SSIZE_MAX)
- bevp->max_single_write = MAX_SINGLE_WRITE_DEFAULT;
- else
- bevp->max_single_write = size;
- BEV_UNLOCK(bev);
- return 0;
- }
- ev_ssize_t
- bufferevent_get_max_single_read(struct bufferevent *bev)
- {
- ev_ssize_t r;
- BEV_LOCK(bev);
- r = BEV_UPCAST(bev)->max_single_read;
- BEV_UNLOCK(bev);
- return r;
- }
- ev_ssize_t
- bufferevent_get_max_single_write(struct bufferevent *bev)
- {
- ev_ssize_t r;
- BEV_LOCK(bev);
- r = BEV_UPCAST(bev)->max_single_write;
- BEV_UNLOCK(bev);
- return r;
- }
- ev_ssize_t
- bufferevent_get_max_to_read(struct bufferevent *bev)
- {
- ev_ssize_t r;
- BEV_LOCK(bev);
- r = bufferevent_get_read_max_(BEV_UPCAST(bev));
- BEV_UNLOCK(bev);
- return r;
- }
- ev_ssize_t
- bufferevent_get_max_to_write(struct bufferevent *bev)
- {
- ev_ssize_t r;
- BEV_LOCK(bev);
- r = bufferevent_get_write_max_(BEV_UPCAST(bev));
- BEV_UNLOCK(bev);
- return r;
- }
- const struct ev_token_bucket_cfg *
- bufferevent_get_token_bucket_cfg(const struct bufferevent *bev) {
- struct bufferevent_private *bufev_private = BEV_UPCAST(bev);
- struct ev_token_bucket_cfg *cfg;
- BEV_LOCK(bev);
- if (bufev_private->rate_limiting) {
- cfg = bufev_private->rate_limiting->cfg;
- } else {
- cfg = NULL;
- }
- BEV_UNLOCK(bev);
- return cfg;
- }
- /* Mostly you don't want to use this function from inside libevent;
- * bufferevent_get_read_max_() is more likely what you want*/
- ev_ssize_t
- bufferevent_rate_limit_group_get_read_limit(
- struct bufferevent_rate_limit_group *grp)
- {
- ev_ssize_t r;
- LOCK_GROUP(grp);
- r = grp->rate_limit.read_limit;
- UNLOCK_GROUP(grp);
- return r;
- }
- /* Mostly you don't want to use this function from inside libevent;
- * bufferevent_get_write_max_() is more likely what you want. */
- ev_ssize_t
- bufferevent_rate_limit_group_get_write_limit(
- struct bufferevent_rate_limit_group *grp)
- {
- ev_ssize_t r;
- LOCK_GROUP(grp);
- r = grp->rate_limit.write_limit;
- UNLOCK_GROUP(grp);
- return r;
- }
- int
- bufferevent_decrement_read_limit(struct bufferevent *bev, ev_ssize_t decr)
- {
- int r = 0;
- ev_ssize_t old_limit, new_limit;
- struct bufferevent_private *bevp;
- BEV_LOCK(bev);
- bevp = BEV_UPCAST(bev);
- EVUTIL_ASSERT(bevp->rate_limiting && bevp->rate_limiting->cfg);
- old_limit = bevp->rate_limiting->limit.read_limit;
- new_limit = (bevp->rate_limiting->limit.read_limit -= decr);
- if (old_limit > 0 && new_limit <= 0) {
- bufferevent_suspend_read_(bev, BEV_SUSPEND_BW);
- if (event_add(&bevp->rate_limiting->refill_bucket_event,
- &bevp->rate_limiting->cfg->tick_timeout) < 0)
- r = -1;
- } else if (old_limit <= 0 && new_limit > 0) {
- if (!(bevp->write_suspended & BEV_SUSPEND_BW))
- event_del(&bevp->rate_limiting->refill_bucket_event);
- bufferevent_unsuspend_read_(bev, BEV_SUSPEND_BW);
- }
- BEV_UNLOCK(bev);
- return r;
- }
- int
- bufferevent_decrement_write_limit(struct bufferevent *bev, ev_ssize_t decr)
- {
- /* XXXX this is mostly copy-and-paste from
- * bufferevent_decrement_read_limit */
- int r = 0;
- ev_ssize_t old_limit, new_limit;
- struct bufferevent_private *bevp;
- BEV_LOCK(bev);
- bevp = BEV_UPCAST(bev);
- EVUTIL_ASSERT(bevp->rate_limiting && bevp->rate_limiting->cfg);
- old_limit = bevp->rate_limiting->limit.write_limit;
- new_limit = (bevp->rate_limiting->limit.write_limit -= decr);
- if (old_limit > 0 && new_limit <= 0) {
- bufferevent_suspend_write_(bev, BEV_SUSPEND_BW);
- if (event_add(&bevp->rate_limiting->refill_bucket_event,
- &bevp->rate_limiting->cfg->tick_timeout) < 0)
- r = -1;
- } else if (old_limit <= 0 && new_limit > 0) {
- if (!(bevp->read_suspended & BEV_SUSPEND_BW))
- event_del(&bevp->rate_limiting->refill_bucket_event);
- bufferevent_unsuspend_write_(bev, BEV_SUSPEND_BW);
- }
- BEV_UNLOCK(bev);
- return r;
- }
- int
- bufferevent_rate_limit_group_decrement_read(
- struct bufferevent_rate_limit_group *grp, ev_ssize_t decr)
- {
- int r = 0;
- ev_ssize_t old_limit, new_limit;
- LOCK_GROUP(grp);
- old_limit = grp->rate_limit.read_limit;
- new_limit = (grp->rate_limit.read_limit -= decr);
- if (old_limit > 0 && new_limit <= 0) {
- bev_group_suspend_reading_(grp);
- } else if (old_limit <= 0 && new_limit > 0) {
- bev_group_unsuspend_reading_(grp);
- }
- UNLOCK_GROUP(grp);
- return r;
- }
- int
- bufferevent_rate_limit_group_decrement_write(
- struct bufferevent_rate_limit_group *grp, ev_ssize_t decr)
- {
- int r = 0;
- ev_ssize_t old_limit, new_limit;
- LOCK_GROUP(grp);
- old_limit = grp->rate_limit.write_limit;
- new_limit = (grp->rate_limit.write_limit -= decr);
- if (old_limit > 0 && new_limit <= 0) {
- bev_group_suspend_writing_(grp);
- } else if (old_limit <= 0 && new_limit > 0) {
- bev_group_unsuspend_writing_(grp);
- }
- UNLOCK_GROUP(grp);
- return r;
- }
- void
- bufferevent_rate_limit_group_get_totals(struct bufferevent_rate_limit_group *grp,
- ev_uint64_t *total_read_out, ev_uint64_t *total_written_out)
- {
- EVUTIL_ASSERT(grp != NULL);
- if (total_read_out)
- *total_read_out = grp->total_read;
- if (total_written_out)
- *total_written_out = grp->total_written;
- }
- void
- bufferevent_rate_limit_group_reset_totals(struct bufferevent_rate_limit_group *grp)
- {
- grp->total_read = grp->total_written = 0;
- }
- int
- bufferevent_ratelim_init_(struct bufferevent_private *bev)
- {
- bev->rate_limiting = NULL;
- bev->max_single_read = MAX_SINGLE_READ_DEFAULT;
- bev->max_single_write = MAX_SINGLE_WRITE_DEFAULT;
- return 0;
- }
|