diff --git a/Makefile.am b/Makefile.am index 08fb0fdd..52b15023 100644 --- a/Makefile.am +++ b/Makefile.am @@ -105,7 +105,7 @@ event-config.h: config.h CORE_SRC = event.c evthread.c buffer.c \ bufferevent.c bufferevent_sock.c bufferevent_filter.c \ - bufferevent_pair.c listener.c \ + bufferevent_pair.c listener.c bufferevent_ratelim.c \ evmap.c log.c evutil.c strlcpy.c $(SYS_SRC) EXTRA_SRC = event_tagging.c http.c evdns.c evrpc.c @@ -136,7 +136,8 @@ noinst_HEADERS = util-internal.h mm-internal.h ipv6-internal.h \ evrpc-internal.h strlcpy-internal.h evbuffer-internal.h \ bufferevent-internal.h http-internal.h event-internal.h \ evthread-internal.h ht-internal.h defer-internal.h \ - minheap-internal.h log-internal.h evsignal-internal.h evmap-internal.h + minheap-internal.h log-internal.h evsignal-internal.h evmap-internal.h \ + ratelim-internal.h include_HEADERS = event.h evhttp.h evdns.h evrpc.h evutil.h diff --git a/bufferevent-internal.h b/bufferevent-internal.h index c3c33eca..220cdeb7 100644 --- a/bufferevent-internal.h +++ b/bufferevent-internal.h @@ -35,6 +35,7 @@ extern "C" { #include "defer-internal.h" #include "evthread-internal.h" #include "event2/thread.h" +#include "ratelim-internal.h" /* These flags are reasons that we might be declining to actually enable reading or writing on a bufferevent. @@ -43,22 +44,77 @@ extern "C" { /* On a all bufferevents, for reading: used when we have read up to the watermark value. - On a filtering bufferxevent, for writing: used when the underlying + On a filtering bufferevent, for writing: used when the underlying bufferevent's write buffer has been filled up to its watermark value. */ #define BEV_SUSPEND_WM 0x01 -/* On a base bufferevent: when we have used up our bandwidth buckets. */ +/* On a base bufferevent: when we have emptied a bandwidth buckets */ #define BEV_SUSPEND_BW 0x02 +/* On a base bufferevent: when we have emptied the group's bandwidth bucket. */ +#define BEV_SUSPEND_BW_GROUP 0x04 /* On a socket bufferevent: we aren't going to try reading until the * connect operation is done. */ -#define BEV_SUSPEND_CONNECTING 0x04 +#define BEV_SUSPEND_CONNECTING 0x08 -struct token_bucket { - ev_uint32_t limit; - ev_uint32_t rate; - ev_uint32_t burst; - unsigned last_updated; +struct bufferevent_rate_limit_group { + /** List of all members in the group */ + TAILQ_HEAD(rlim_group_member_list, bufferevent_private) members; + /** Current limits for the group. */ + struct ev_token_bucket rate_limit; + struct ev_token_bucket_cfg rate_limit_cfg; + + /** True iff we don't want to read from any member of the group.until + * the token bucket refills. */ + unsigned read_suspended : 1; + /** True iff we don't want to write from any member of the group.until + * the token bucket refills. */ + unsigned write_suspended : 1; + /** True iff we were unable to suspend one of the bufferevents in the + * group for reading the last time we tried, and we should try + * again. */ + unsigned pending_unsuspend_read : 1; + /** True iff we were unable to suspend one of the bufferevents in the + * group for writing the last time we tried, and we should try + * again. */ + unsigned pending_unsuspend_write : 1; + + /** The number of bufferevents in the group. */ + int n_members; + + /** The smallest number of bytes that any member of the group should + * be limited to read or write at a time. */ + ev_uint32_t min_share; + /** Timeout event that goes off once a tick, when the bucket is ready + * to refill. */ + struct event master_refill_event; + /** Lock to protect the members of this group. This lock should nest + * within every bufferevent lock: if you are holding this lock, do + * not assume you can lock another bufferevent. */ + void *lock; +}; + +/** Fields for rate-limiting a single bufferevent. */ +struct bufferevent_rate_limit { + /* Linked-list elements for storing this bufferevent_private in a + * group. + * + * Note that this field is supposed to be protected by the group + * lock */ + TAILQ_ENTRY(bufferevent_private) next_in_group; + /** The rate-limiting group for this bufferevent, or NULL if it is + * only rate-limited on its own. */ + struct bufferevent_rate_limit_group *group; + + /* This bufferevent's current limits. */ + struct ev_token_bucket limit; + /* Pointer to the rate-limit configuration for this bufferevent. + * Can be shared. XXX reference-count this? */ + struct ev_token_bucket_cfg *cfg; + + /* Timeout event used when one this bufferevent's buckets are + * empty. */ + struct event refill_bucket_event; }; /** Parts of the bufferevent structure that are shared among all bufferevent @@ -111,6 +167,9 @@ struct bufferevent_private { /** Lock for this bufferevent. Shared by the inbuf and the outbuf. * If NULL, locking is disabled. */ void *lock; + + /** Rate-limiting information for this bufferevent */ + struct bufferevent_rate_limit *rate_limiting; }; /** Possible operations for a control callback. */ @@ -170,6 +229,7 @@ struct bufferevent_ops { /** Called to access miscellaneous fields. */ int (*ctrl)(struct bufferevent *, enum bufferevent_ctrl_op, union bufferevent_ctrl_data *); + }; extern const struct bufferevent_ops bufferevent_ops_socket; @@ -287,6 +347,15 @@ void _bufferevent_generic_adj_timeouts(struct bufferevent *bev); EVLOCK_UNLOCK(locking->lock, 0); \ } while(0) +/* ==== For rate-limiting. */ + +int _bufferevent_decrement_write_buckets(struct bufferevent_private *bev, + int bytes); +int _bufferevent_decrement_read_buckets(struct bufferevent_private *bev, + int bytes); +int _bufferevent_get_read_max(struct bufferevent_private *bev); +int _bufferevent_get_write_max(struct bufferevent_private *bev); + #ifdef __cplusplus } #endif diff --git a/bufferevent.c b/bufferevent.c index 96b8ec7d..94a00714 100644 --- a/bufferevent.c +++ b/bufferevent.c @@ -525,6 +525,15 @@ _bufferevent_decref_and_unlock(struct bufferevent *bufev) evbuffer_free(bufev->input); evbuffer_free(bufev->output); + if (bufev_private->rate_limiting) { + if (bufev_private->rate_limiting->group) + bufferevent_remove_from_rate_limit_group(bufev); + if (event_initialized(&bufev_private->rate_limiting->refill_bucket_event)) + event_del(&bufev_private->rate_limiting->refill_bucket_event); + mm_free(bufev_private->rate_limiting); + bufev_private->rate_limiting = NULL; + } + BEV_UNLOCK(bufev); if (bufev_private->own_lock) EVTHREAD_FREE_LOCK(bufev_private->lock, diff --git a/bufferevent_async.c b/bufferevent_async.c index a8e92b70..2075301b 100644 --- a/bufferevent_async.c +++ b/bufferevent_async.c @@ -127,6 +127,8 @@ upcast_write(struct event_overlapped *eo) static void bev_async_consider_writing(struct bufferevent_async *b) { + size_t at_most; + int limit; /* Don't write if there's a write in progress, or we do not * want to write. */ if (!b->ok || b->write_in_progress || !(b->bev.bev.enabled&EV_WRITE)) @@ -135,8 +137,18 @@ bev_async_consider_writing(struct bufferevent_async *b) if (!evbuffer_get_length(b->bev.bev.output)) return; + at_most = evbuffer_get_length(b->bev.bev.output); + + /* XXXX This over-commits. */ + limit = _bufferevent_get_write_max(&b->bev); + if (at_most >= limit) + at_most = limit; + + if (b->bev.write_suspended) + return; + /* XXXX doesn't respect low-water mark very well. */ - if (evbuffer_launch_write(b->bev.bev.output, -1, + if (evbuffer_launch_write(b->bev.bev.output, at_most, &b->write_overlapped)) { EVUTIL_ASSERT(0);/* XXX act sensibly. */ } else { @@ -150,6 +162,7 @@ bev_async_consider_reading(struct bufferevent_async *b) size_t cur_size; size_t read_high; size_t at_most; + int limit; /* Don't read if there is a read in progress, or we do not * want to read. */ if (!b->ok || b->read_in_progress || !(b->bev.bev.enabled&EV_READ)) @@ -166,6 +179,14 @@ bev_async_consider_reading(struct bufferevent_async *b) at_most = 16384; /* FIXME totally magic. */ } + /* XXXX This over-commits. */ + limit = _bufferevent_get_read_max(&b->bev); + if (at_most >= limit) + at_most = limit; + + if (b->bev.read_suspended) + return; + if (evbuffer_launch_read(b->bev.bev.input, at_most, &b->read_overlapped)) { EVUTIL_ASSERT(0); @@ -304,6 +325,7 @@ read_complete(struct event_overlapped *eo, uintptr_t key, if (ok && nbytes) { BEV_RESET_GENERIC_READ_TIMEOUT(bev); + _bufferevent_derement_read_buckets(&bev_a->bev, nbytes); if (bev->readcb != NULL && evbuffer_get_length(bev->input) >= bev->wm_read.low) _bufferevent_run_readcb(bev); @@ -337,7 +359,8 @@ write_complete(struct event_overlapped *eo, uintptr_t key, if (ok && nbytes) { BEV_RESET_GENERIC_WRITE_TIMEOUT(bev); - if (bev->writecb != NULL && + _bufferevent_derement_write_buckets(&bev_a->bev, nbytes); + if (bev->writecb != NULL && evbuffer_get_length(bev->output) <= bev->wm_write.low) _bufferevent_run_writecb(bev); bev_async_consider_writing(bev_a); diff --git a/bufferevent_openssl.c b/bufferevent_openssl.c index f121c5be..e2cb28b1 100644 --- a/bufferevent_openssl.c +++ b/bufferevent_openssl.c @@ -524,20 +524,29 @@ do_read(struct bufferevent_openssl *bev_ssl, int n_to_read) /* Requires lock */ struct bufferevent *bev = &bev_ssl->bev.bev; struct evbuffer *input = bev->input; - int r, n, i, n_used = 0, blocked = 0; + int r, n, i, n_used = 0, blocked = 0, atmost; struct evbuffer_iovec space[2]; + atmost = _bufferevent_get_read_max(&bev_ssl->bev); + if (n_to_read > atmost) + n_to_read = atmost; + n = evbuffer_reserve_space(input, n_to_read, space, 2); if (n < 0) return -1; for (i=0; ibev.read_suspended) + break; r = SSL_read(bev_ssl->ssl, space[i].iov_base, space[i].iov_len); if (r>0) { if (bev_ssl->read_blocked_on_write) clear_rbow(bev_ssl); ++n_used; space[i].iov_len = r; + /* Not exactly right; we probably want to do + * our rate-limiting on the underlying bytes. */ + _bufferevent_decrement_read_buckets(&bev_ssl->bev, r); } else { int err = SSL_get_error(bev_ssl->ssl, r); print_err(err); @@ -585,6 +594,8 @@ do_write(struct bufferevent_openssl *bev_ssl, int atmost) if (bev_ssl->last_write > 0) atmost = bev_ssl->last_write; + else + atmost = _bufferevent_get_write_max(&bev_ssl->bev); n = evbuffer_peek(output, atmost, NULL, space, 8); if (n < 0) @@ -593,6 +604,9 @@ do_write(struct bufferevent_openssl *bev_ssl, int atmost) if (n > 8) n = 8; for (i=0; i < n; ++i) { + if (bev_ssl->bev.write_suspended) + break; + r = SSL_write(bev_ssl->ssl, space[i].iov_base, space[i].iov_len); if (r > 0) { @@ -600,6 +614,9 @@ do_write(struct bufferevent_openssl *bev_ssl, int atmost) clear_wbor(bev_ssl); n_written += r; bev_ssl->last_write = -1; + /* Not exactly right; we probably want to do + * our rate-limiting on the underlying bytes. */ + _bufferevent_decrement_write_buckets(&bev_ssl->bev, r); } else { int err = SSL_get_error(bev_ssl->ssl, r); print_err(err); diff --git a/bufferevent_ratelim.c b/bufferevent_ratelim.c new file mode 100644 index 00000000..564367cb --- /dev/null +++ b/bufferevent_ratelim.c @@ -0,0 +1,654 @@ +/* + * Copyright (c) 2007-2009 Niels Provos and Nick Mathewson + * Copyright (c) 2002-2006 Niels Provos + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions + * are met: + * 1. Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * 3. The name of the author may not be used to endorse or promote products + * derived from this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR + * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES + * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. + * IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, + * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT + * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF + * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +#include +#include +#include +#include + +#include "event2/event.h" +#include "event2/event_struct.h" +#include "event2/util.h" +#include "event2/bufferevent.h" +#include "event2/bufferevent_struct.h" +#include "event2/buffer.h" + +#include "ratelim-internal.h" + +#include "bufferevent-internal.h" +#include "mm-internal.h" +#include "util-internal.h" + +int +ev_token_bucket_init(struct ev_token_bucket *bucket, + const struct ev_token_bucket_cfg *cfg, + ev_uint32_t current_tick, + int reinitialize) +{ + if (reinitialize) { + /* on reinitialization, we only clip downwards, since we've + already used who-knows-how-much bandwidth this tick. We + leave "last_updated" as it is; the next update will add the + appropriate amount of bandwidth to the bucket. + */ + if (bucket->read_limit > cfg->read_maximum) + bucket->read_limit = cfg->read_maximum; + if (bucket->write_limit > cfg->write_maximum) + bucket->write_limit = cfg->write_maximum; + } else { + bucket->read_limit = cfg->read_rate; + bucket->write_limit = cfg->write_rate; + bucket->last_updated = current_tick; + } + return 0; +} + +int +ev_token_bucket_update(struct ev_token_bucket *bucket, + const struct ev_token_bucket_cfg *cfg, + ev_uint32_t current_tick) +{ + /* It's okay if the tick number overflows, since we'll just + * wrap around when we do the unsigned substraction. */ + unsigned n_ticks = current_tick - bucket->last_updated; + + /* Make sure some ticks actually happened, and that time didn't + * roll back. */ + if (n_ticks == 0 || n_ticks > INT_MAX) + return 0; + + /* Naively, we would say + bucket->limit += n_ticks * cfg->rate; + + if (bucket->limit > cfg->maximum) + bucket->limit = cfg->maximum; + + But we're worried about overflow, so we do it like this: + */ + + if ((cfg->read_maximum - bucket->read_limit) / n_ticks < cfg->read_rate) + bucket->read_limit = cfg->read_maximum; + else + bucket->read_limit += n_ticks * cfg->read_rate; + + + if ((cfg->write_maximum - bucket->write_limit) / n_ticks < cfg->write_rate) + bucket->write_limit = cfg->write_maximum; + else + bucket->write_limit += n_ticks * cfg->write_rate; + + + bucket->last_updated = current_tick; + + return 1; +} + +ev_uint32_t +ev_token_bucket_get_tick(const struct timeval *tv, + const struct ev_token_bucket_cfg *cfg) +{ + /* This computation uses two multiplies and a divide. We could do + * fewer if we knew that the tick length was an integer number of + * seconds, or if we knew it divided evenly into a second. We should + * investigate that more. + */ + + /* We cast to an ev_uint64_t first, since we don't want to overflow + * before we do the final divide. */ + ev_uint64_t msec = (ev_uint64_t)tv->tv_sec * 1000 + tv->tv_usec / 1000; + return (unsigned)(msec / cfg->msec_per_tick); +} + +struct ev_token_bucket_cfg * +ev_token_bucket_cfg_new(ev_uint32_t read_rate, ev_uint32_t read_burst, + ev_uint32_t write_rate, ev_uint32_t write_burst, + const struct timeval *tick_len) +{ + struct ev_token_bucket_cfg *r; + struct timeval g; + if (! tick_len) { + g.tv_sec = 1; + g.tv_usec = 0; + tick_len = &g; + } + if (read_rate > read_burst || write_rate > write_burst || + read_rate < 1 || write_rate < 1) + return NULL; + r = mm_calloc(1, sizeof(struct ev_token_bucket_cfg)); + if (!r) + return NULL; + r->read_rate = read_rate; + r->write_rate = write_rate; + r->read_maximum = read_burst; + r->write_maximum = write_burst; + memcpy(&r->tick_timeout, tick_len, sizeof(struct timeval)); + r->msec_per_tick = (tick_len->tv_sec * 1000) + tick_len->tv_usec/1000; + return r; +} + +void +ev_token_bucket_cfg_free(struct ev_token_bucket_cfg *cfg) +{ + mm_free(cfg); +} + +/* No matter how big our bucket gets, don't try to read more than this + * much in a single read operation. */ +#define MAX_TO_READ_EVER 16384 +/* No matter how big our bucket gets, don't try to write more than this + * much in a single write operation. */ +#define MAX_TO_WRITE_EVER 16384 + +#define LOCK_GROUP(g) EVLOCK_LOCK((g)->lock, 0) +#define UNLOCK_GROUP(g) EVLOCK_UNLOCK((g)->lock, 0) + +static int _bev_group_suspend_reading(struct bufferevent_rate_limit_group *g); +static int _bev_group_suspend_writing(struct bufferevent_rate_limit_group *g); + +/** Helper: figure out the maximum amount we should write if is_write, or + the maximum amount we should read if is_read. Return that maximum, or + 0 if our bucket is wholly exhausted. + */ +static inline int +_bufferevent_get_rlim_max(struct bufferevent_private *bev, int is_write) +{ + /* needs lock on bev. */ + int max_so_far = is_write?MAX_TO_WRITE_EVER:MAX_TO_READ_EVER; + struct timeval now; + +#define LIM(x) \ + (is_write ? (x).write_limit : (x).read_limit) + +#define GROUP_SUSPENDED(g) \ + (is_write ? (g)->write_suspended : (g)->read_suspended) + + /* Sets max_so_far to MIN(x, max_so_far) */ +#define CLAMPTO(x) \ + do { \ + if (max_so_far > (x)) \ + max_so_far = (x); \ + } while (0); + + if (!bev->rate_limiting) + return max_so_far; + + /* If rate-limiting is enabled at all, update the appropriate + bucket, and take the smaller of our rate limit and the group + rate limit. + */ + + if (bev->rate_limiting->cfg) { + unsigned tick; + + event_base_gettimeofday_cached(bev->bev.ev_base, &now); + tick = ev_token_bucket_get_tick(&now, bev->rate_limiting->cfg); + ev_token_bucket_update(&bev->rate_limiting->limit, + bev->rate_limiting->cfg, tick); + max_so_far = LIM(bev->rate_limiting->limit); + } + if (bev->rate_limiting->group) { + struct bufferevent_rate_limit_group *g = + bev->rate_limiting->group; + ev_uint32_t share; + LOCK_GROUP(g); + if (GROUP_SUSPENDED(g)) { + /* We can get here if we failed to lock this + * particular bufferevent while suspending the whole + * group. */ + if (is_write) + bufferevent_suspend_write(&bev->bev, + BEV_SUSPEND_BW_GROUP); + else + bufferevent_suspend_read(&bev->bev, + BEV_SUSPEND_BW_GROUP); + share = 0; + } else { + /* XXXX probably we should divide among the active + * members, not the total members. */ + share = LIM(g->rate_limit) / g->n_members; + if (share < g->min_share) + share = g->min_share; + } + UNLOCK_GROUP(g); + CLAMPTO(share); + } + + return max_so_far; +} + +int +_bufferevent_get_read_max(struct bufferevent_private *bev) +{ + return _bufferevent_get_rlim_max(bev, 0); +} + +int +_bufferevent_get_write_max(struct bufferevent_private *bev) +{ + return _bufferevent_get_rlim_max(bev, 1); +} + +int +_bufferevent_decrement_read_buckets(struct bufferevent_private *bev, int bytes) +{ + /* need to hold lock on bev */ + if (!bev->rate_limiting) + return 0; + + if (bev->rate_limiting->cfg) { + bev->rate_limiting->limit.read_limit -= bytes; + if (bev->rate_limiting->limit.read_limit <= 0) { + bufferevent_suspend_read(&bev->bev, BEV_SUSPEND_BW); + event_add(&bev->rate_limiting->refill_bucket_event, + &bev->rate_limiting->cfg->tick_timeout); + } + } + + if (bev->rate_limiting->group) { + LOCK_GROUP(bev->rate_limiting->group); + bev->rate_limiting->group->rate_limit.read_limit -= bytes; + if (bev->rate_limiting->group->rate_limit.read_limit <= 0) { + _bev_group_suspend_reading(bev->rate_limiting->group); + } + UNLOCK_GROUP(bev->rate_limiting->group); + } + + return 0; +} + +int +_bufferevent_decrement_write_buckets(struct bufferevent_private *bev, int bytes) +{ + /* need to hold lock */ + if (!bev->rate_limiting) + return 0; + + if (bev->rate_limiting->cfg) { + bev->rate_limiting->limit.write_limit -= bytes; + if (bev->rate_limiting->limit.write_limit <= 0) { + bufferevent_suspend_write(&bev->bev, BEV_SUSPEND_BW); + event_add(&bev->rate_limiting->refill_bucket_event, + &bev->rate_limiting->cfg->tick_timeout); + } + } + + if (bev->rate_limiting->group) { + LOCK_GROUP(bev->rate_limiting->group); + bev->rate_limiting->group->rate_limit.write_limit -= bytes; + if (bev->rate_limiting->group->rate_limit.write_limit <= 0) { + _bev_group_suspend_writing(bev->rate_limiting->group); + } + UNLOCK_GROUP(bev->rate_limiting->group); + } + + return 0; +} + +/** Stop reading on every bufferevent in g */ +static int +_bev_group_suspend_reading(struct bufferevent_rate_limit_group *g) +{ + /* Needs group lock */ + struct bufferevent_private *bev; + g->read_suspended = 1; + g->pending_unsuspend_read = 0; + + /* Note that in this loop we call EVLOCK_TRY_LOCK instead of BEV_LOCK, + to prevent a deadlock. (Ordinarily, the group lock nests inside + the bufferevent locks. If we are unable to lock any individual + bufferevent, it will find out later when it looks at its limit + and sees that its group is suspended. + */ + TAILQ_FOREACH(bev, &g->members, rate_limiting->next_in_group) { + if (EVLOCK_TRY_LOCK(bev->lock)) { + bufferevent_suspend_read(&bev->bev, + BEV_SUSPEND_BW_GROUP); + EVLOCK_UNLOCK(bev->lock, 0); + } + } + return 0; +} + +/** Stop writing on every bufferevent in g */ +static int +_bev_group_suspend_writing(struct bufferevent_rate_limit_group *g) +{ + /* Needs group lock */ + struct bufferevent_private *bev; + g->write_suspended = 1; + g->pending_unsuspend_write = 0; + TAILQ_FOREACH(bev, &g->members, rate_limiting->next_in_group) { + if (EVLOCK_TRY_LOCK(bev->lock)) { + bufferevent_suspend_write(&bev->bev, + BEV_SUSPEND_BW_GROUP); + EVLOCK_UNLOCK(bev->lock, 0); + } + } + return 0; +} + +/** Timer callback invoked on a single bufferevent with one or more exhausted + buckets when they are ready to refill. */ +static void +_bev_refill_callback(evutil_socket_t fd, short what, void *arg) +{ + unsigned tick; + struct timeval now; + struct bufferevent_private *bev = arg; + int again = 0; + BEV_LOCK(&bev->bev); + if (!bev->rate_limiting || !bev->rate_limiting->cfg) { + BEV_UNLOCK(&bev->bev); + return; + } + + /* First, update the bucket */ + event_base_gettimeofday_cached(bev->bev.ev_base, &now); + tick = ev_token_bucket_get_tick(&now, + bev->rate_limiting->cfg); + ev_token_bucket_update(&bev->rate_limiting->limit, + bev->rate_limiting->cfg, + tick); + + /* Now unsuspend any read/write operations as appropriate. */ + if ((bev->read_suspended & BEV_SUSPEND_BW)) { + if (bev->rate_limiting->limit.read_limit > 0) + bufferevent_unsuspend_read(&bev->bev, BEV_SUSPEND_BW); + else + again = 1; + } + if ((bev->write_suspended & BEV_SUSPEND_BW)) { + if (bev->rate_limiting->limit.write_limit > 0) + bufferevent_unsuspend_write(&bev->bev, BEV_SUSPEND_BW); + else + again = 1; + } + if (again) { + /* One or more of the buckets may need another refill if they + started negative. + + XXXX if we need to be quiet for more ticks, we should + maybe figure out what timeout we really want. + */ + event_add(&bev->rate_limiting->refill_bucket_event, + &bev->rate_limiting->cfg->tick_timeout); + } + BEV_UNLOCK(&bev->bev); +} + +/** Helper: grab a random element from a bufferevent group. */ +static struct bufferevent_private * +_bev_group_random_element(struct bufferevent_rate_limit_group *group) +{ + int which; + struct bufferevent_private *bev; + + /* requires group lock */ + + if (!group->n_members) + return NULL; + + EVUTIL_ASSERT(! TAILQ_EMPTY(&group->members)); + + which = random() % group->n_members; + + bev = TAILQ_FIRST(&group->members); + while (which--) + bev = TAILQ_NEXT(bev, rate_limiting->next_in_group); + + return bev; +} + +/** Iterate over the elements of a rate-limiting group 'g' with a random + starting point, assigning each to the variable 'bev', and executing the + block 'block'. + + We do this in a half-baked effort to get fairness among group members. + XXX Round-robin or some kind of priority queue would be even more fair. + */ +#define FOREACH_RANDOM_ORDER(block) \ + do { \ + first = _bev_group_random_element(g); \ + for (bev = first; bev != TAILQ_END(&g->members); \ + bev = TAILQ_NEXT(bev, rate_limiting->next_in_group)) { \ + block ; \ + } \ + for (bev = TAILQ_FIRST(&g->members); bev && bev != first; \ + bev = TAILQ_NEXT(bev, rate_limiting->next_in_group)) { \ + block ; \ + } \ + } while (0) + +/** Callback invoked every tick to add more elements to the group bucket + and unsuspend group members as needed. + */ +static void +_bev_group_refill_callback(evutil_socket_t fd, short what, void *arg) +{ + struct bufferevent_rate_limit_group *g = arg; + unsigned tick; + struct timeval now; + int again = 0; + struct bufferevent_private *bev, *first; + + event_base_gettimeofday_cached(event_get_base(&g->master_refill_event), &now); + + LOCK_GROUP(g); + tick = ev_token_bucket_get_tick(&now, &g->rate_limit_cfg); + ev_token_bucket_update(&g->rate_limit, &g->rate_limit_cfg, tick); + + if (g->pending_unsuspend_read || + (g->read_suspended && (g->rate_limit.read_limit >= g->min_share))) { + g->read_suspended = 0; + again = 0; + FOREACH_RANDOM_ORDER({ + if (EVLOCK_TRY_LOCK(bev->lock)) { + bufferevent_unsuspend_read(&bev->bev, + BEV_SUSPEND_BW_GROUP); + EVLOCK_UNLOCK(bev->lock, 0); + } else { + again = 1; + } + }); + g->pending_unsuspend_read = again; + } + if (g->pending_unsuspend_write || + (g->write_suspended && (g->rate_limit.write_limit >= g->min_share))){ + g->write_suspended = 0; + again = 0; + FOREACH_RANDOM_ORDER({ + if (EVLOCK_TRY_LOCK(bev->lock)) { + bufferevent_unsuspend_write(&bev->bev, + BEV_SUSPEND_BW_GROUP); + EVLOCK_UNLOCK(bev->lock, 0); + } else { + again = 1; + } + }); + g->pending_unsuspend_write = again; + } + + /* XXXX Rather than waiting to the next tick to unsuspend stuff + * with pending_unsuspend_write/read, we should do it on the + * next iteration of the mainloop. + */ + + UNLOCK_GROUP(g); +} + +int +bufferevent_set_rate_limit(struct bufferevent *bev, + struct ev_token_bucket_cfg *cfg) +{ + struct bufferevent_private *bevp = + EVUTIL_UPCAST(bev, struct bufferevent_private, bev); + int r = -1; + struct bufferevent_rate_limit *rlim; + struct timeval now; + ev_uint32_t tick; + /* XXX reference-count cfg */ + + BEV_LOCK(bev); + + if (cfg == NULL) { + if (bevp->rate_limiting) { + bevp->rate_limiting->cfg = NULL; + bufferevent_unsuspend_read(bev, BEV_SUSPEND_BW); + bufferevent_unsuspend_write(bev, BEV_SUSPEND_BW); + } + r = 0; + goto done; + } + + event_base_gettimeofday_cached(bev->ev_base, &now); + tick = ev_token_bucket_get_tick(&now, cfg); + + if (bevp->rate_limiting && bevp->rate_limiting->cfg == cfg) { + ; + } else if (bevp->rate_limiting) { + bevp->rate_limiting->cfg = cfg; + ev_token_bucket_init(&bevp->rate_limiting->limit, cfg, tick, 1); + if (bevp->rate_limiting->limit.read_limit > 0) + bufferevent_unsuspend_read(bev, BEV_SUSPEND_BW); + else + bufferevent_suspend_read(bev, BEV_SUSPEND_BW); + if (bevp->rate_limiting->limit.write_limit > 0) + bufferevent_unsuspend_write(bev, BEV_SUSPEND_BW); + else + bufferevent_suspend_write(bev, BEV_SUSPEND_BW); + } else { + rlim = mm_calloc(1, sizeof(struct bufferevent_rate_limit)); + if (!rlim) + goto done; + rlim->cfg = cfg; + ev_token_bucket_init(&rlim->limit, cfg, tick, 0); + evtimer_assign(&rlim->refill_bucket_event, bev->ev_base, + _bev_refill_callback, bevp); + bevp->rate_limiting = rlim; + } + r = 0; +done: + BEV_UNLOCK(bev); + return r; +} + +struct bufferevent_rate_limit_group * +bufferevent_rate_limit_group_new(struct event_base *base, + const struct ev_token_bucket_cfg *cfg) +{ + struct bufferevent_rate_limit_group *g; + struct timeval now; + ev_uint32_t tick; + + event_base_gettimeofday_cached(base, &now); + tick = ev_token_bucket_get_tick(&now, cfg); + + g = mm_calloc(1, sizeof(struct bufferevent_rate_limit_group)); + if (!g) + return NULL; + memcpy(&g->rate_limit_cfg, cfg, sizeof(g->rate_limit_cfg)); + TAILQ_INIT(&g->members); + + ev_token_bucket_init(&g->rate_limit, cfg, tick, 0); + + g->min_share = 64; + event_assign(&g->master_refill_event, base, -1, EV_PERSIST, + _bev_group_refill_callback, g); + event_add(&g->master_refill_event, &cfg->tick_timeout); + + EVTHREAD_ALLOC_LOCK(g->lock, EVTHREAD_LOCKTYPE_RECURSIVE); + return g; +} + +int +bufferevent_add_to_rate_limit_group(struct bufferevent *bev, + struct bufferevent_rate_limit_group *g) +{ + int wsuspend, rsuspend; + struct bufferevent_private *bevp = + EVUTIL_UPCAST(bev, struct bufferevent_private, bev); + BEV_LOCK(bev); + + if (!bevp->rate_limiting) { + struct bufferevent_rate_limit *rlim; + rlim = mm_calloc(1, sizeof(struct bufferevent_rate_limit)); + if (!rlim) { + BEV_UNLOCK(bev); + return -1; + } + evtimer_assign(&rlim->refill_bucket_event, bev->ev_base, + _bev_refill_callback, bevp); + bevp->rate_limiting = rlim; + } + + if (bevp->rate_limiting->group == g) { + BEV_UNLOCK(bev); + return 0; + } + if (bevp->rate_limiting->group) + bufferevent_remove_from_rate_limit_group(bev); + + LOCK_GROUP(g); + bevp->rate_limiting->group = g; + ++g->n_members; + TAILQ_INSERT_TAIL(&g->members, bevp, rate_limiting->next_in_group); + + rsuspend = g->read_suspended; + wsuspend = g->write_suspended; + + UNLOCK_GROUP(g); + + if (rsuspend) + bufferevent_suspend_read(bev, BEV_SUSPEND_BW_GROUP); + if (wsuspend) + bufferevent_suspend_write(bev, BEV_SUSPEND_BW_GROUP); + + BEV_UNLOCK(bev); + return 0; +} + +int +bufferevent_remove_from_rate_limit_group(struct bufferevent *bev) +{ + struct bufferevent_private *bevp = + EVUTIL_UPCAST(bev, struct bufferevent_private, bev); + BEV_LOCK(bev); + if (bevp->rate_limiting && bevp->rate_limiting->group) { + struct bufferevent_rate_limit_group *g = + bevp->rate_limiting->group; + LOCK_GROUP(g); + bevp->rate_limiting->group = NULL; + --g->n_members; + TAILQ_REMOVE(&g->members, bevp, rate_limiting->next_in_group); + UNLOCK_GROUP(g); + } + bufferevent_unsuspend_read(bev, BEV_SUSPEND_BW_GROUP); + bufferevent_unsuspend_write(bev, BEV_SUSPEND_BW_GROUP); + BEV_UNLOCK(bev); + return 0; +} diff --git a/bufferevent_sock.c b/bufferevent_sock.c index 61a369f6..f53689ee 100644 --- a/bufferevent_sock.c +++ b/bufferevent_sock.c @@ -118,10 +118,12 @@ static void bufferevent_readcb(evutil_socket_t fd, short event, void *arg) { struct bufferevent *bufev = arg; + struct bufferevent_private *bufev_p = + EVUTIL_UPCAST(bufev, struct bufferevent_private, bev); struct evbuffer *input; int res = 0; short what = BEV_EVENT_READING; - int howmuch = -1; + int howmuch = -1, readmax=-1; _bufferevent_incref_and_lock(bufev); @@ -144,6 +146,12 @@ bufferevent_readcb(evutil_socket_t fd, short event, void *arg) goto done; } } + readmax = _bufferevent_get_read_max(bufev_p); + if (howmuch < 0 || howmuch > readmax) /* The use of -1 for "unlimited" + * uglifies this code. */ + howmuch = readmax; + if (bufev_p->read_suspended) + goto done; evbuffer_unfreeze(input, 0); res = evbuffer_read(input, fd, howmuch); @@ -163,6 +171,7 @@ bufferevent_readcb(evutil_socket_t fd, short event, void *arg) if (res <= 0) goto error; + _bufferevent_decrement_read_buckets(bufev_p, res); /* Invoke the user callback - must always be called last */ if (evbuffer_get_length(input) >= bufev->wm_read.low && @@ -191,6 +200,7 @@ bufferevent_writecb(evutil_socket_t fd, short event, void *arg) int res = 0; short what = BEV_EVENT_WRITING; int connected = 0; + int atmost = -1; _bufferevent_incref_and_lock(bufev); @@ -232,9 +242,14 @@ bufferevent_writecb(evutil_socket_t fd, short event, void *arg) } } + atmost = _bufferevent_get_write_max(bufev_p); + + if (bufev_p->write_suspended) + goto done; + if (evbuffer_get_length(bufev->output)) { evbuffer_unfreeze(bufev->output, 1); - res = evbuffer_write(bufev->output, fd); + res = evbuffer_write_atmost(bufev->output, fd, atmost); evbuffer_freeze(bufev->output, 1); if (res == -1) { int err = evutil_socket_geterror(fd); @@ -250,6 +265,8 @@ bufferevent_writecb(evutil_socket_t fd, short event, void *arg) } if (res <= 0) goto error; + + _bufferevent_decrement_write_buckets(bufev_p, res); } if (evbuffer_get_length(bufev->output) == 0) diff --git a/include/event2/bufferevent.h b/include/event2/bufferevent.h index 523a7fce..48d56bd6 100644 --- a/include/event2/bufferevent.h +++ b/include/event2/bufferevent.h @@ -499,6 +499,97 @@ int bufferevent_pair_new(struct event_base *base, int options, struct bufferevent *pair[2]); + +/** + Abstract type used to configure rate-limiting on a bufferevent or a group + of bufferevents. + */ +struct ev_token_bucket_cfg; +/** + A group of bufferevents which are configured to respect the same rate + limit. +*/ +struct bufferevent_rate_limit_group; + +/** + Initialize and return a new object to configure the rate-limiting behavior + of bufferevents. + + @param read_rate The maximum number of bytes to read per tick on + average. + @param read_burst The maximum number of bytes to read in any single tick. + @param write_rate The maximum number of bytes to write per tick on + average. + @param write_burst The maximum number of bytes to write in any single tick. + @param tick_len The length of a single tick. Defaults to one second. + Any fractions of a millisecond are ignored. + + Note that all rate-limits hare are currently best-effort: future versions + of Libevent may implement them more tightly. + */ +struct ev_token_bucket_cfg *ev_token_bucket_cfg_new( + ev_uint32_t read_rate, ev_uint32_t read_burst, + ev_uint32_t write_rate, ev_uint32_t write_burst, + const struct timeval *tick_len); + +/** Free all storage held in 'cfg'. + + Note: 'cfg' is not currently reference-counted; it is not safe to free it + until no bufferevent is using it. + */ +void ev_token_bucket_cfg_free(struct ev_token_bucket_cfg *cfg); + +/** + Set the rate-limit of a the bufferevent 'bev' to the one specified in + 'cfg'. If 'cfg' is NULL, disable any per-bufferevent rate-limiting on + 'bev'. + + Note that only some bufferevent types currently respect rate-limiting. + They are: socket-based bufferevents (normal and IOCP-based), and SSL-based + bufferevents. + + Return 0 on sucess, -1 on failure. + */ +int bufferevent_set_rate_limit(struct bufferevent *bev, + struct ev_token_bucket_cfg *cfg); +/** + Create a new rate-limit group for bufferevents. A rate-limit group + constrains the maximum number of bytes sent and received, in toto, + by all of its bufferevents. + + @param base An event_base to run any necessary timeouts for the group. + Note that all bufferevents in the group do not necessarily need to share + this event_base. + @param cfg The rate-limit for this group. + + Note that all rate-limits hare are currently best-effort: future versions + of Libevent may implement them more tightly. + + Note also that only some bufferevent types currently respect rate-limiting. + They are: socket-based bufferevents (normal and IOCP-based), and SSL-based + bufferevents. + */ +struct bufferevent_rate_limit_group *bufferevent_rate_limit_group_new( + struct event_base *base, + const struct ev_token_bucket_cfg *cfg); +/*XXX we need a bufferevent_rate_limit_group_set_cfg */ + +/** + Add 'bev' to the list of bufferevents whose aggregate reading and writing + is restricted by 'g'. If 'g' is NULL, remove 'bev' from its current group. + + A bufferevent may belong to no more than one rate-limit group at a time. + If 'bev' is already a member of a group, it will be removed from its old + group before being added to 'g'. + + Return 0 on success and -1 on failure. + */ +int bufferevent_add_to_rate_limit_group(struct bufferevent *bev, + struct bufferevent_rate_limit_group *g); + +/** Remove 'bev' from its current rate-limit group (if any). */ +int bufferevent_remove_from_rate_limit_group(struct bufferevent *bev); + #ifdef __cplusplus } #endif diff --git a/include/event2/util.h b/include/event2/util.h index 98c26b61..e50e434b 100644 --- a/include/event2/util.h +++ b/include/event2/util.h @@ -93,12 +93,16 @@ extern "C" { #ifdef _EVENT_HAVE_UINT32_T #define ev_uint32_t uint32_t +#define ev_int32_t int32_t #elif defined(WIN32) #define ev_uint32_t unsigned int +#define ev_int32_t signed int #elif _EVENT_SIZEOF_LONG == 4 #define ev_uint32_t unsigned long +#define ev_int32_t signed long #elif _EVENT_SIZEOF_INT == 4 #define ev_uint32_t unsigned int +#define ev_int32_t signed int #else #error "No way to define ev_uint32_t" #endif diff --git a/ratelim-internal.h b/ratelim-internal.h new file mode 100644 index 00000000..105798ae --- /dev/null +++ b/ratelim-internal.h @@ -0,0 +1,102 @@ +/* + * Copyright (c) 2009 Niels Provos and Nick Mathewson + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions + * are met: + * 1. Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * 3. The name of the author may not be used to endorse or promote products + * derived from this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR + * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES + * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. + * IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, + * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT + * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF + * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ +#ifndef _RATELIM_INTERNAL_H_ +#define _RATELIM_INTERNAL_H_ + +#ifdef __cplusplus +extern "C" { +#endif + +#include + +/** A token bucket is an internal structure that tracks how many bytes we are + * currently willing to read or write on a given bufferevent or group of + * bufferevents */ +struct ev_token_bucket { + /** How many bytes are we willing to read or write right now? These + * values are signed so that we can do "defecit spending" */ + ev_int32_t read_limit, write_limit; + /** When was this bucket last updated? Measured in abstract 'ticks' + * relative to the token bucket configuration. */ + ev_uint32_t last_updated; +}; + +/** Configuration info for a token bucket or set of token buckets. */ +struct ev_token_bucket_cfg { + /** How many bytes are we willing to read on average per tick? */ + ev_uint32_t read_rate; + /** How many bytes are we willing to read at most in any one tick? */ + ev_uint32_t read_maximum; + /** How many bytes are we willing to write on average per tick? */ + ev_uint32_t write_rate; + /** How many bytes are we willing to write at most in any one tick? */ + ev_uint32_t write_maximum; + + /* How long is a tick? Note that fractions of a millisecond are + * ignored. */ + struct timeval tick_timeout; + + /* How long is a tick, in milliseconds? Derived from tick_timeout. */ + unsigned msec_per_tick; +}; + +/** The current tick is 'current_tick': add bytes to 'bucket' as specified in + * 'cfg'. */ +int ev_token_bucket_update(struct ev_token_bucket *bucket, + const struct ev_token_bucket_cfg *cfg, + ev_uint32_t current_tick); + +/** In which tick does 'tv' fall according to 'cfg'? Note that ticks can + * overflow easily; your code needs to handle this. */ +ev_uint32_t ev_token_bucket_get_tick(const struct timeval *tv, + const struct ev_token_bucket_cfg *cfg); + +/** Adjust 'bucket' to respect 'cfg', and note that it was last updated in + * 'current_tick'. If 'reinitialize' is true, we are changing the + * configuration of 'bucket'; otherwise, we are setting it up for the first + * time. + */ +int ev_token_bucket_init(struct ev_token_bucket *bucket, + const struct ev_token_bucket_cfg *cfg, + ev_uint32_t current_tick, + int reinitialize); + +/** Decrease the read limit of 'b' by 'n' bytes */ +#define ev_token_bucket_decrement_read(b,n) \ + do { \ + (b)->read_limit -= (n); \ + } while (0) +/** Decrease the write limit of 'b' by 'n' bytes */ +#define ev_token_bucket_decrement_write(b,n) \ + do { \ + (b)->write_limit -= (n); \ + } while (0) + +#ifdef __cplusplus +} +#endif + +#endif