mirror of
https://github.com/libevent/libevent.git
synced 2025-01-09 00:56:20 +08:00
e4b8732701
Found by oss-fuzz, after coverage had been improved in https://github.com/google/oss-fuzz/pull/11257 v2: better check (found by CI for windows)
1112 lines
30 KiB
C
1112 lines
30 KiB
C
/*
|
|
* Copyright (c) 2007-2012 Niels Provos and Nick Mathewson
|
|
* Copyright (c) 2002-2006 Niels Provos <provos@citi.umich.edu>
|
|
* All rights reserved.
|
|
*
|
|
* Redistribution and use in source and binary forms, with or without
|
|
* modification, are permitted provided that the following conditions
|
|
* are met:
|
|
* 1. Redistributions of source code must retain the above copyright
|
|
* notice, this list of conditions and the following disclaimer.
|
|
* 2. Redistributions in binary form must reproduce the above copyright
|
|
* notice, this list of conditions and the following disclaimer in the
|
|
* documentation and/or other materials provided with the distribution.
|
|
* 3. The name of the author may not be used to endorse or promote products
|
|
* derived from this software without specific prior written permission.
|
|
*
|
|
* THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR
|
|
* IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
|
|
* OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
|
|
* IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT,
|
|
* INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
|
|
* NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
|
|
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
|
|
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
|
|
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF
|
|
* THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
|
|
*/
|
|
#include "evconfig-private.h"
|
|
|
|
#include <sys/types.h>
|
|
#include <limits.h>
|
|
#include <string.h>
|
|
#include <stdlib.h>
|
|
|
|
#include "event2/event.h"
|
|
#include "event2/event_struct.h"
|
|
#include "event2/util.h"
|
|
#include "event2/bufferevent.h"
|
|
#include "event2/bufferevent_struct.h"
|
|
#include "event2/buffer.h"
|
|
|
|
#include "ratelim-internal.h"
|
|
|
|
#include "bufferevent-internal.h"
|
|
#include "mm-internal.h"
|
|
#include "util-internal.h"
|
|
#include "event-internal.h"
|
|
|
|
int
|
|
ev_token_bucket_init_(struct ev_token_bucket *bucket,
|
|
const struct ev_token_bucket_cfg *cfg,
|
|
ev_uint32_t current_tick,
|
|
int reinitialize)
|
|
{
|
|
if (reinitialize) {
|
|
/* on reinitialization, we only clip downwards, since we've
|
|
already used who-knows-how-much bandwidth this tick. We
|
|
leave "last_updated" as it is; the next update will add the
|
|
appropriate amount of bandwidth to the bucket.
|
|
*/
|
|
if (bucket->read_limit > (ev_int64_t) cfg->read_maximum)
|
|
bucket->read_limit = cfg->read_maximum;
|
|
if (bucket->write_limit > (ev_int64_t) cfg->write_maximum)
|
|
bucket->write_limit = cfg->write_maximum;
|
|
} else {
|
|
bucket->read_limit = cfg->read_rate;
|
|
bucket->write_limit = cfg->write_rate;
|
|
bucket->last_updated = current_tick;
|
|
}
|
|
return 0;
|
|
}
|
|
|
|
int
|
|
ev_token_bucket_update_(struct ev_token_bucket *bucket,
|
|
const struct ev_token_bucket_cfg *cfg,
|
|
ev_uint32_t current_tick)
|
|
{
|
|
/* It's okay if the tick number overflows, since we'll just
|
|
* wrap around when we do the unsigned subtraction. */
|
|
unsigned n_ticks = current_tick - bucket->last_updated;
|
|
|
|
/* Make sure some ticks actually happened, and that time didn't
|
|
* roll back. */
|
|
if (n_ticks == 0 || n_ticks > INT_MAX)
|
|
return 0;
|
|
|
|
/* Naively, we would say
|
|
bucket->limit += n_ticks * cfg->rate;
|
|
|
|
if (bucket->limit > cfg->maximum)
|
|
bucket->limit = cfg->maximum;
|
|
|
|
But we're worried about overflow, so we do it like this:
|
|
*/
|
|
|
|
if ((cfg->read_maximum - bucket->read_limit) / n_ticks < cfg->read_rate)
|
|
bucket->read_limit = cfg->read_maximum;
|
|
else
|
|
bucket->read_limit += n_ticks * cfg->read_rate;
|
|
|
|
|
|
if ((cfg->write_maximum - bucket->write_limit) / n_ticks < cfg->write_rate)
|
|
bucket->write_limit = cfg->write_maximum;
|
|
else
|
|
bucket->write_limit += n_ticks * cfg->write_rate;
|
|
|
|
|
|
bucket->last_updated = current_tick;
|
|
|
|
return 1;
|
|
}
|
|
|
|
static inline void
|
|
bufferevent_update_buckets(struct bufferevent_private *bev)
|
|
{
|
|
/* Must hold lock on bev. */
|
|
struct timeval now;
|
|
unsigned tick;
|
|
event_base_gettimeofday_cached(bev->bev.ev_base, &now);
|
|
tick = ev_token_bucket_get_tick_(&now, bev->rate_limiting->cfg);
|
|
if (tick != bev->rate_limiting->limit.last_updated)
|
|
ev_token_bucket_update_(&bev->rate_limiting->limit,
|
|
bev->rate_limiting->cfg, tick);
|
|
}
|
|
|
|
ev_uint32_t
|
|
ev_token_bucket_get_tick_(const struct timeval *tv,
|
|
const struct ev_token_bucket_cfg *cfg)
|
|
{
|
|
/* This computation uses two multiplies and a divide. We could do
|
|
* fewer if we knew that the tick length was an integer number of
|
|
* seconds, or if we knew it divided evenly into a second. We should
|
|
* investigate that more.
|
|
*/
|
|
|
|
/* We cast to an ev_uint64_t first, since we don't want to overflow
|
|
* before we do the final divide. */
|
|
ev_uint64_t msec = (ev_uint64_t)tv->tv_sec * 1000 + tv->tv_usec / 1000;
|
|
return (unsigned)(msec / cfg->msec_per_tick);
|
|
}
|
|
|
|
struct ev_token_bucket_cfg *
|
|
ev_token_bucket_cfg_new(size_t read_rate, size_t read_burst,
|
|
size_t write_rate, size_t write_burst,
|
|
const struct timeval *tick_len)
|
|
{
|
|
struct ev_token_bucket_cfg *r;
|
|
struct timeval g;
|
|
unsigned msec_per_tick;
|
|
|
|
if (! tick_len) {
|
|
g.tv_sec = 1;
|
|
g.tv_usec = 0;
|
|
tick_len = &g;
|
|
}
|
|
|
|
/* Avoid possible overflow.
|
|
* - there is no point in accepting values larger then INT_MAX/1000 anyway
|
|
* - on windows tv_sec (tv_usec) is long, which is int, which has upper value limit INT_MAX
|
|
* - and also negative values does not make any sense
|
|
*/
|
|
if (tick_len->tv_sec < 0 || tick_len->tv_sec > INT_MAX/1000)
|
|
return NULL;
|
|
|
|
/* Note, overflow with tv_usec is not possible since tv_sec is limited to
|
|
* INT_MAX/1000 anyway */
|
|
msec_per_tick = (unsigned)(tick_len->tv_sec * 1000) +
|
|
(tick_len->tv_usec & COMMON_TIMEOUT_MICROSECONDS_MASK)/1000;
|
|
if (!msec_per_tick)
|
|
return NULL;
|
|
|
|
if (read_rate > read_burst || write_rate > write_burst ||
|
|
read_rate < 1 || write_rate < 1)
|
|
return NULL;
|
|
if (read_rate > EV_RATE_LIMIT_MAX ||
|
|
write_rate > EV_RATE_LIMIT_MAX ||
|
|
read_burst > EV_RATE_LIMIT_MAX ||
|
|
write_burst > EV_RATE_LIMIT_MAX)
|
|
return NULL;
|
|
r = mm_calloc(1, sizeof(struct ev_token_bucket_cfg));
|
|
if (!r)
|
|
return NULL;
|
|
r->read_rate = read_rate;
|
|
r->write_rate = write_rate;
|
|
r->read_maximum = read_burst;
|
|
r->write_maximum = write_burst;
|
|
memcpy(&r->tick_timeout, tick_len, sizeof(struct timeval));
|
|
r->msec_per_tick = msec_per_tick;
|
|
return r;
|
|
}
|
|
|
|
void
|
|
ev_token_bucket_cfg_free(struct ev_token_bucket_cfg *cfg)
|
|
{
|
|
mm_free(cfg);
|
|
}
|
|
|
|
/* Default values for max_single_read & max_single_write variables. */
|
|
#define MAX_SINGLE_READ_DEFAULT 16384
|
|
#define MAX_SINGLE_WRITE_DEFAULT 16384
|
|
|
|
#define LOCK_GROUP(g) EVLOCK_LOCK((g)->lock, 0)
|
|
#define UNLOCK_GROUP(g) EVLOCK_UNLOCK((g)->lock, 0)
|
|
|
|
static int bev_group_suspend_reading_(struct bufferevent_rate_limit_group *g);
|
|
static int bev_group_suspend_writing_(struct bufferevent_rate_limit_group *g);
|
|
static void bev_group_unsuspend_reading_(struct bufferevent_rate_limit_group *g);
|
|
static void bev_group_unsuspend_writing_(struct bufferevent_rate_limit_group *g);
|
|
|
|
/** Helper: figure out the maximum amount we should write if is_write, or
|
|
the maximum amount we should read if is_read. Return that maximum, or
|
|
0 if our bucket is wholly exhausted.
|
|
*/
|
|
static inline ev_ssize_t
|
|
bufferevent_get_rlim_max_(struct bufferevent_private *bev, int is_write)
|
|
{
|
|
/* needs lock on bev. */
|
|
ev_ssize_t max_so_far = is_write?bev->max_single_write:bev->max_single_read;
|
|
|
|
#define LIM(x) \
|
|
(is_write ? (x).write_limit : (x).read_limit)
|
|
|
|
#define GROUP_SUSPENDED(g) \
|
|
(is_write ? (g)->write_suspended : (g)->read_suspended)
|
|
|
|
/* Sets max_so_far to MIN(x, max_so_far) */
|
|
#define CLAMPTO(x) \
|
|
do { \
|
|
if (max_so_far > (x)) \
|
|
max_so_far = (x); \
|
|
} while (0);
|
|
|
|
if (!bev->rate_limiting)
|
|
return max_so_far;
|
|
|
|
/* If rate-limiting is enabled at all, update the appropriate
|
|
bucket, and take the smaller of our rate limit and the group
|
|
rate limit.
|
|
*/
|
|
|
|
if (bev->rate_limiting->cfg) {
|
|
bufferevent_update_buckets(bev);
|
|
max_so_far = LIM(bev->rate_limiting->limit);
|
|
}
|
|
if (bev->rate_limiting->group) {
|
|
struct bufferevent_rate_limit_group *g =
|
|
bev->rate_limiting->group;
|
|
ev_ssize_t share;
|
|
LOCK_GROUP(g);
|
|
if (GROUP_SUSPENDED(g)) {
|
|
/* We can get here if we failed to lock this
|
|
* particular bufferevent while suspending the whole
|
|
* group. */
|
|
if (is_write)
|
|
bufferevent_suspend_write_(&bev->bev,
|
|
BEV_SUSPEND_BW_GROUP);
|
|
else
|
|
bufferevent_suspend_read_(&bev->bev,
|
|
BEV_SUSPEND_BW_GROUP);
|
|
share = 0;
|
|
} else {
|
|
/* XXXX probably we should divide among the active
|
|
* members, not the total members. */
|
|
share = LIM(g->rate_limit) / g->n_members;
|
|
if (share < g->min_share)
|
|
share = g->min_share;
|
|
}
|
|
UNLOCK_GROUP(g);
|
|
CLAMPTO(share);
|
|
}
|
|
|
|
if (max_so_far < 0)
|
|
max_so_far = 0;
|
|
return max_so_far;
|
|
}
|
|
|
|
ev_ssize_t
|
|
bufferevent_get_read_max_(struct bufferevent_private *bev)
|
|
{
|
|
return bufferevent_get_rlim_max_(bev, 0);
|
|
}
|
|
|
|
ev_ssize_t
|
|
bufferevent_get_write_max_(struct bufferevent_private *bev)
|
|
{
|
|
return bufferevent_get_rlim_max_(bev, 1);
|
|
}
|
|
|
|
int
|
|
bufferevent_decrement_read_buckets_(struct bufferevent_private *bev, ev_ssize_t bytes)
|
|
{
|
|
/* XXXXX Make sure all users of this function check its return value */
|
|
int r = 0;
|
|
/* need to hold lock on bev */
|
|
if (!bev->rate_limiting)
|
|
return 0;
|
|
|
|
if (bev->rate_limiting->cfg) {
|
|
bev->rate_limiting->limit.read_limit -= bytes;
|
|
if (bev->rate_limiting->limit.read_limit <= 0) {
|
|
bufferevent_suspend_read_(&bev->bev, BEV_SUSPEND_BW);
|
|
if (event_add(&bev->rate_limiting->refill_bucket_event,
|
|
&bev->rate_limiting->cfg->tick_timeout) < 0)
|
|
r = -1;
|
|
} else if (bev->read_suspended & BEV_SUSPEND_BW) {
|
|
if (!(bev->write_suspended & BEV_SUSPEND_BW))
|
|
event_del(&bev->rate_limiting->refill_bucket_event);
|
|
bufferevent_unsuspend_read_(&bev->bev, BEV_SUSPEND_BW);
|
|
}
|
|
}
|
|
|
|
if (bev->rate_limiting->group) {
|
|
LOCK_GROUP(bev->rate_limiting->group);
|
|
bev->rate_limiting->group->rate_limit.read_limit -= bytes;
|
|
bev->rate_limiting->group->total_read += bytes;
|
|
if (bev->rate_limiting->group->rate_limit.read_limit <= 0) {
|
|
bev_group_suspend_reading_(bev->rate_limiting->group);
|
|
} else if (bev->rate_limiting->group->read_suspended) {
|
|
bev_group_unsuspend_reading_(bev->rate_limiting->group);
|
|
}
|
|
UNLOCK_GROUP(bev->rate_limiting->group);
|
|
}
|
|
|
|
return r;
|
|
}
|
|
|
|
int
|
|
bufferevent_decrement_write_buckets_(struct bufferevent_private *bev, ev_ssize_t bytes)
|
|
{
|
|
/* XXXXX Make sure all users of this function check its return value */
|
|
int r = 0;
|
|
/* need to hold lock */
|
|
if (!bev->rate_limiting)
|
|
return 0;
|
|
|
|
if (bev->rate_limiting->cfg) {
|
|
bev->rate_limiting->limit.write_limit -= bytes;
|
|
if (bev->rate_limiting->limit.write_limit <= 0) {
|
|
bufferevent_suspend_write_(&bev->bev, BEV_SUSPEND_BW);
|
|
if (event_add(&bev->rate_limiting->refill_bucket_event,
|
|
&bev->rate_limiting->cfg->tick_timeout) < 0)
|
|
r = -1;
|
|
} else if (bev->write_suspended & BEV_SUSPEND_BW) {
|
|
if (!(bev->read_suspended & BEV_SUSPEND_BW))
|
|
event_del(&bev->rate_limiting->refill_bucket_event);
|
|
bufferevent_unsuspend_write_(&bev->bev, BEV_SUSPEND_BW);
|
|
}
|
|
}
|
|
|
|
if (bev->rate_limiting->group) {
|
|
LOCK_GROUP(bev->rate_limiting->group);
|
|
bev->rate_limiting->group->rate_limit.write_limit -= bytes;
|
|
bev->rate_limiting->group->total_written += bytes;
|
|
if (bev->rate_limiting->group->rate_limit.write_limit <= 0) {
|
|
bev_group_suspend_writing_(bev->rate_limiting->group);
|
|
} else if (bev->rate_limiting->group->write_suspended) {
|
|
bev_group_unsuspend_writing_(bev->rate_limiting->group);
|
|
}
|
|
UNLOCK_GROUP(bev->rate_limiting->group);
|
|
}
|
|
|
|
return r;
|
|
}
|
|
|
|
/** Stop reading on every bufferevent in <b>g</b> */
|
|
static int
|
|
bev_group_suspend_reading_(struct bufferevent_rate_limit_group *g)
|
|
{
|
|
/* Needs group lock */
|
|
struct bufferevent_private *bev;
|
|
g->read_suspended = 1;
|
|
g->pending_unsuspend_read = 0;
|
|
|
|
/* Note that in this loop we call EVLOCK_TRY_LOCK_ instead of BEV_LOCK,
|
|
to prevent a deadlock. (Ordinarily, the group lock nests inside
|
|
the bufferevent locks. If we are unable to lock any individual
|
|
bufferevent, it will find out later when it looks at its limit
|
|
and sees that its group is suspended.)
|
|
*/
|
|
LIST_FOREACH(bev, &g->members, rate_limiting->next_in_group) {
|
|
if (EVLOCK_TRY_LOCK_(bev->lock)) {
|
|
bufferevent_suspend_read_(&bev->bev,
|
|
BEV_SUSPEND_BW_GROUP);
|
|
EVLOCK_UNLOCK(bev->lock, 0);
|
|
}
|
|
}
|
|
return 0;
|
|
}
|
|
|
|
/** Stop writing on every bufferevent in <b>g</b> */
|
|
static int
|
|
bev_group_suspend_writing_(struct bufferevent_rate_limit_group *g)
|
|
{
|
|
/* Needs group lock */
|
|
struct bufferevent_private *bev;
|
|
g->write_suspended = 1;
|
|
g->pending_unsuspend_write = 0;
|
|
LIST_FOREACH(bev, &g->members, rate_limiting->next_in_group) {
|
|
if (EVLOCK_TRY_LOCK_(bev->lock)) {
|
|
bufferevent_suspend_write_(&bev->bev,
|
|
BEV_SUSPEND_BW_GROUP);
|
|
EVLOCK_UNLOCK(bev->lock, 0);
|
|
}
|
|
}
|
|
return 0;
|
|
}
|
|
|
|
/** Timer callback invoked on a single bufferevent with one or more exhausted
|
|
buckets when they are ready to refill. */
|
|
static void
|
|
bev_refill_callback_(evutil_socket_t fd, short what, void *arg)
|
|
{
|
|
unsigned tick;
|
|
struct timeval now;
|
|
struct bufferevent_private *bev = arg;
|
|
int again = 0;
|
|
BEV_LOCK(&bev->bev);
|
|
if (!bev->rate_limiting || !bev->rate_limiting->cfg) {
|
|
BEV_UNLOCK(&bev->bev);
|
|
return;
|
|
}
|
|
|
|
/* First, update the bucket */
|
|
event_base_gettimeofday_cached(bev->bev.ev_base, &now);
|
|
tick = ev_token_bucket_get_tick_(&now,
|
|
bev->rate_limiting->cfg);
|
|
ev_token_bucket_update_(&bev->rate_limiting->limit,
|
|
bev->rate_limiting->cfg,
|
|
tick);
|
|
|
|
/* Now unsuspend any read/write operations as appropriate. */
|
|
if ((bev->read_suspended & BEV_SUSPEND_BW)) {
|
|
if (bev->rate_limiting->limit.read_limit > 0)
|
|
bufferevent_unsuspend_read_(&bev->bev, BEV_SUSPEND_BW);
|
|
else
|
|
again = 1;
|
|
}
|
|
if ((bev->write_suspended & BEV_SUSPEND_BW)) {
|
|
if (bev->rate_limiting->limit.write_limit > 0)
|
|
bufferevent_unsuspend_write_(&bev->bev, BEV_SUSPEND_BW);
|
|
else
|
|
again = 1;
|
|
}
|
|
if (again) {
|
|
/* One or more of the buckets may need another refill if they
|
|
started negative.
|
|
|
|
XXXX if we need to be quiet for more ticks, we should
|
|
maybe figure out what timeout we really want.
|
|
*/
|
|
/* XXXX Handle event_add failure somehow */
|
|
event_add(&bev->rate_limiting->refill_bucket_event,
|
|
&bev->rate_limiting->cfg->tick_timeout);
|
|
}
|
|
BEV_UNLOCK(&bev->bev);
|
|
}
|
|
|
|
/** Helper: grab a random element from a bufferevent group.
|
|
*
|
|
* Requires that we hold the lock on the group.
|
|
*/
|
|
static struct bufferevent_private *
|
|
bev_group_random_element_(struct bufferevent_rate_limit_group *group)
|
|
{
|
|
int which;
|
|
struct bufferevent_private *bev;
|
|
|
|
/* requires group lock */
|
|
|
|
if (!group->n_members)
|
|
return NULL;
|
|
|
|
EVUTIL_ASSERT(! LIST_EMPTY(&group->members));
|
|
|
|
which = evutil_weakrand_range_(&group->weakrand_seed, group->n_members);
|
|
|
|
bev = LIST_FIRST(&group->members);
|
|
while (which--)
|
|
bev = LIST_NEXT(bev, rate_limiting->next_in_group);
|
|
|
|
return bev;
|
|
}
|
|
|
|
/** Iterate over the elements of a rate-limiting group 'g' with a random
|
|
starting point, assigning each to the variable 'bev', and executing the
|
|
block 'block'.
|
|
|
|
We do this in a half-baked effort to get fairness among group members.
|
|
XXX Round-robin or some kind of priority queue would be even more fair.
|
|
*/
|
|
#define FOREACH_RANDOM_ORDER(block) \
|
|
do { \
|
|
first = bev_group_random_element_(g); \
|
|
for (bev = first; bev != LIST_END(&g->members); \
|
|
bev = LIST_NEXT(bev, rate_limiting->next_in_group)) { \
|
|
block ; \
|
|
} \
|
|
for (bev = LIST_FIRST(&g->members); bev && bev != first; \
|
|
bev = LIST_NEXT(bev, rate_limiting->next_in_group)) { \
|
|
block ; \
|
|
} \
|
|
} while (0)
|
|
|
|
static void
|
|
bev_group_unsuspend_reading_(struct bufferevent_rate_limit_group *g)
|
|
{
|
|
int again = 0;
|
|
struct bufferevent_private *bev, *first;
|
|
|
|
g->read_suspended = 0;
|
|
FOREACH_RANDOM_ORDER({
|
|
if (EVLOCK_TRY_LOCK_(bev->lock)) {
|
|
bufferevent_unsuspend_read_(&bev->bev,
|
|
BEV_SUSPEND_BW_GROUP);
|
|
EVLOCK_UNLOCK(bev->lock, 0);
|
|
} else {
|
|
again = 1;
|
|
}
|
|
});
|
|
g->pending_unsuspend_read = again;
|
|
}
|
|
|
|
static void
|
|
bev_group_unsuspend_writing_(struct bufferevent_rate_limit_group *g)
|
|
{
|
|
int again = 0;
|
|
struct bufferevent_private *bev, *first;
|
|
g->write_suspended = 0;
|
|
|
|
FOREACH_RANDOM_ORDER({
|
|
if (EVLOCK_TRY_LOCK_(bev->lock)) {
|
|
bufferevent_unsuspend_write_(&bev->bev,
|
|
BEV_SUSPEND_BW_GROUP);
|
|
EVLOCK_UNLOCK(bev->lock, 0);
|
|
} else {
|
|
again = 1;
|
|
}
|
|
});
|
|
g->pending_unsuspend_write = again;
|
|
}
|
|
|
|
/** Callback invoked every tick to add more elements to the group bucket
|
|
and unsuspend group members as needed.
|
|
*/
|
|
static void
|
|
bev_group_refill_callback_(evutil_socket_t fd, short what, void *arg)
|
|
{
|
|
struct bufferevent_rate_limit_group *g = arg;
|
|
unsigned tick;
|
|
struct timeval now;
|
|
|
|
event_base_gettimeofday_cached(event_get_base(&g->master_refill_event), &now);
|
|
|
|
LOCK_GROUP(g);
|
|
|
|
tick = ev_token_bucket_get_tick_(&now, &g->rate_limit_cfg);
|
|
ev_token_bucket_update_(&g->rate_limit, &g->rate_limit_cfg, tick);
|
|
|
|
if (g->pending_unsuspend_read ||
|
|
(g->read_suspended && (g->rate_limit.read_limit >= g->min_share))) {
|
|
bev_group_unsuspend_reading_(g);
|
|
}
|
|
if (g->pending_unsuspend_write ||
|
|
(g->write_suspended && (g->rate_limit.write_limit >= g->min_share))){
|
|
bev_group_unsuspend_writing_(g);
|
|
}
|
|
|
|
/* XXXX Rather than waiting to the next tick to unsuspend stuff
|
|
* with pending_unsuspend_write/read, we should do it on the
|
|
* next iteration of the mainloop.
|
|
*/
|
|
|
|
UNLOCK_GROUP(g);
|
|
}
|
|
|
|
int
|
|
bufferevent_set_rate_limit(struct bufferevent *bev,
|
|
struct ev_token_bucket_cfg *cfg)
|
|
{
|
|
struct bufferevent_private *bevp = BEV_UPCAST(bev);
|
|
int r = -1;
|
|
struct bufferevent_rate_limit *rlim;
|
|
struct timeval now;
|
|
ev_uint32_t tick;
|
|
int reinit = 0, suspended = 0;
|
|
/* XXX reference-count cfg */
|
|
|
|
BEV_LOCK(bev);
|
|
|
|
if (cfg == NULL) {
|
|
if (bevp->rate_limiting) {
|
|
rlim = bevp->rate_limiting;
|
|
rlim->cfg = NULL;
|
|
bufferevent_unsuspend_read_(bev, BEV_SUSPEND_BW);
|
|
bufferevent_unsuspend_write_(bev, BEV_SUSPEND_BW);
|
|
if (event_initialized(&rlim->refill_bucket_event))
|
|
event_del(&rlim->refill_bucket_event);
|
|
}
|
|
r = 0;
|
|
goto done;
|
|
}
|
|
|
|
event_base_gettimeofday_cached(bev->ev_base, &now);
|
|
tick = ev_token_bucket_get_tick_(&now, cfg);
|
|
|
|
if (bevp->rate_limiting && bevp->rate_limiting->cfg == cfg) {
|
|
/* no-op */
|
|
r = 0;
|
|
goto done;
|
|
}
|
|
if (bevp->rate_limiting == NULL) {
|
|
rlim = mm_calloc(1, sizeof(struct bufferevent_rate_limit));
|
|
if (!rlim)
|
|
goto done;
|
|
bevp->rate_limiting = rlim;
|
|
} else {
|
|
rlim = bevp->rate_limiting;
|
|
}
|
|
reinit = rlim->cfg != NULL;
|
|
|
|
rlim->cfg = cfg;
|
|
ev_token_bucket_init_(&rlim->limit, cfg, tick, reinit);
|
|
|
|
if (reinit) {
|
|
EVUTIL_ASSERT(event_initialized(&rlim->refill_bucket_event));
|
|
event_del(&rlim->refill_bucket_event);
|
|
}
|
|
event_assign(&rlim->refill_bucket_event, bev->ev_base,
|
|
-1, EV_FINALIZE, bev_refill_callback_, bevp);
|
|
|
|
if (rlim->limit.read_limit > 0) {
|
|
bufferevent_unsuspend_read_(bev, BEV_SUSPEND_BW);
|
|
} else {
|
|
bufferevent_suspend_read_(bev, BEV_SUSPEND_BW);
|
|
suspended=1;
|
|
}
|
|
if (rlim->limit.write_limit > 0) {
|
|
bufferevent_unsuspend_write_(bev, BEV_SUSPEND_BW);
|
|
} else {
|
|
bufferevent_suspend_write_(bev, BEV_SUSPEND_BW);
|
|
suspended = 1;
|
|
}
|
|
|
|
if (suspended)
|
|
event_add(&rlim->refill_bucket_event, &cfg->tick_timeout);
|
|
|
|
r = 0;
|
|
|
|
done:
|
|
BEV_UNLOCK(bev);
|
|
return r;
|
|
}
|
|
|
|
struct bufferevent_rate_limit_group *
|
|
bufferevent_rate_limit_group_new(struct event_base *base,
|
|
const struct ev_token_bucket_cfg *cfg)
|
|
{
|
|
struct bufferevent_rate_limit_group *g;
|
|
struct timeval now;
|
|
ev_uint32_t tick;
|
|
|
|
event_base_gettimeofday_cached(base, &now);
|
|
tick = ev_token_bucket_get_tick_(&now, cfg);
|
|
|
|
g = mm_calloc(1, sizeof(struct bufferevent_rate_limit_group));
|
|
if (!g)
|
|
return NULL;
|
|
memcpy(&g->rate_limit_cfg, cfg, sizeof(g->rate_limit_cfg));
|
|
LIST_INIT(&g->members);
|
|
|
|
ev_token_bucket_init_(&g->rate_limit, cfg, tick, 0);
|
|
|
|
event_assign(&g->master_refill_event, base, -1, EV_PERSIST|EV_FINALIZE,
|
|
bev_group_refill_callback_, g);
|
|
/*XXXX handle event_add failure */
|
|
event_add(&g->master_refill_event, &cfg->tick_timeout);
|
|
|
|
EVTHREAD_ALLOC_LOCK(g->lock, EVTHREAD_LOCKTYPE_RECURSIVE);
|
|
|
|
bufferevent_rate_limit_group_set_min_share(g, 64);
|
|
|
|
evutil_weakrand_seed_(&g->weakrand_seed,
|
|
(ev_uint32_t) ((now.tv_sec + now.tv_usec) + (ev_intptr_t)g));
|
|
|
|
return g;
|
|
}
|
|
|
|
int
|
|
bufferevent_rate_limit_group_set_cfg(
|
|
struct bufferevent_rate_limit_group *g,
|
|
const struct ev_token_bucket_cfg *cfg)
|
|
{
|
|
int same_tick;
|
|
if (!g || !cfg)
|
|
return -1;
|
|
|
|
LOCK_GROUP(g);
|
|
same_tick = evutil_timercmp(
|
|
&g->rate_limit_cfg.tick_timeout, &cfg->tick_timeout, ==);
|
|
memcpy(&g->rate_limit_cfg, cfg, sizeof(g->rate_limit_cfg));
|
|
|
|
if (g->rate_limit.read_limit > (ev_ssize_t)cfg->read_maximum)
|
|
g->rate_limit.read_limit = cfg->read_maximum;
|
|
if (g->rate_limit.write_limit > (ev_ssize_t)cfg->write_maximum)
|
|
g->rate_limit.write_limit = cfg->write_maximum;
|
|
|
|
if (!same_tick) {
|
|
/* This can cause a hiccup in the schedule */
|
|
event_add(&g->master_refill_event, &cfg->tick_timeout);
|
|
}
|
|
|
|
/* The new limits might force us to adjust min_share differently. */
|
|
bufferevent_rate_limit_group_set_min_share(g, g->configured_min_share);
|
|
|
|
UNLOCK_GROUP(g);
|
|
return 0;
|
|
}
|
|
|
|
int
|
|
bufferevent_rate_limit_group_set_min_share(
|
|
struct bufferevent_rate_limit_group *g,
|
|
size_t share)
|
|
{
|
|
if (share > EV_SSIZE_MAX)
|
|
return -1;
|
|
|
|
g->configured_min_share = share;
|
|
|
|
/* Can't set share to less than the one-tick maximum. IOW, at steady
|
|
* state, at least one connection can go per tick. */
|
|
if (share > g->rate_limit_cfg.read_rate)
|
|
share = g->rate_limit_cfg.read_rate;
|
|
if (share > g->rate_limit_cfg.write_rate)
|
|
share = g->rate_limit_cfg.write_rate;
|
|
|
|
g->min_share = share;
|
|
return 0;
|
|
}
|
|
|
|
void
|
|
bufferevent_rate_limit_group_free(struct bufferevent_rate_limit_group *g)
|
|
{
|
|
LOCK_GROUP(g);
|
|
EVUTIL_ASSERT(0 == g->n_members);
|
|
event_del(&g->master_refill_event);
|
|
UNLOCK_GROUP(g);
|
|
EVTHREAD_FREE_LOCK(g->lock, EVTHREAD_LOCKTYPE_RECURSIVE);
|
|
mm_free(g);
|
|
}
|
|
|
|
int
|
|
bufferevent_add_to_rate_limit_group(struct bufferevent *bev,
|
|
struct bufferevent_rate_limit_group *g)
|
|
{
|
|
int wsuspend, rsuspend;
|
|
struct bufferevent_private *bevp = BEV_UPCAST(bev);
|
|
BEV_LOCK(bev);
|
|
|
|
if (!bevp->rate_limiting) {
|
|
struct bufferevent_rate_limit *rlim;
|
|
rlim = mm_calloc(1, sizeof(struct bufferevent_rate_limit));
|
|
if (!rlim) {
|
|
BEV_UNLOCK(bev);
|
|
return -1;
|
|
}
|
|
event_assign(&rlim->refill_bucket_event, bev->ev_base,
|
|
-1, EV_FINALIZE, bev_refill_callback_, bevp);
|
|
bevp->rate_limiting = rlim;
|
|
}
|
|
|
|
if (bevp->rate_limiting->group == g) {
|
|
BEV_UNLOCK(bev);
|
|
return 0;
|
|
}
|
|
if (bevp->rate_limiting->group)
|
|
bufferevent_remove_from_rate_limit_group(bev);
|
|
|
|
LOCK_GROUP(g);
|
|
bevp->rate_limiting->group = g;
|
|
++g->n_members;
|
|
LIST_INSERT_HEAD(&g->members, bevp, rate_limiting->next_in_group);
|
|
|
|
rsuspend = g->read_suspended;
|
|
wsuspend = g->write_suspended;
|
|
|
|
UNLOCK_GROUP(g);
|
|
|
|
if (rsuspend)
|
|
bufferevent_suspend_read_(bev, BEV_SUSPEND_BW_GROUP);
|
|
if (wsuspend)
|
|
bufferevent_suspend_write_(bev, BEV_SUSPEND_BW_GROUP);
|
|
|
|
BEV_UNLOCK(bev);
|
|
return 0;
|
|
}
|
|
|
|
int
|
|
bufferevent_remove_from_rate_limit_group(struct bufferevent *bev)
|
|
{
|
|
return bufferevent_remove_from_rate_limit_group_internal_(bev, 1);
|
|
}
|
|
|
|
int
|
|
bufferevent_remove_from_rate_limit_group_internal_(struct bufferevent *bev,
|
|
int unsuspend)
|
|
{
|
|
struct bufferevent_private *bevp = BEV_UPCAST(bev);
|
|
BEV_LOCK(bev);
|
|
if (bevp->rate_limiting && bevp->rate_limiting->group) {
|
|
struct bufferevent_rate_limit_group *g =
|
|
bevp->rate_limiting->group;
|
|
LOCK_GROUP(g);
|
|
bevp->rate_limiting->group = NULL;
|
|
--g->n_members;
|
|
LIST_REMOVE(bevp, rate_limiting->next_in_group);
|
|
UNLOCK_GROUP(g);
|
|
}
|
|
if (unsuspend) {
|
|
bufferevent_unsuspend_read_(bev, BEV_SUSPEND_BW_GROUP);
|
|
bufferevent_unsuspend_write_(bev, BEV_SUSPEND_BW_GROUP);
|
|
}
|
|
BEV_UNLOCK(bev);
|
|
return 0;
|
|
}
|
|
|
|
/* ===
|
|
* API functions to expose rate limits.
|
|
*
|
|
* Don't use these from inside Libevent; they're meant to be for use by
|
|
* the program.
|
|
* === */
|
|
|
|
/* Mostly you don't want to use this function from inside libevent;
|
|
* bufferevent_get_read_max_() is more likely what you want*/
|
|
ev_ssize_t
|
|
bufferevent_get_read_limit(struct bufferevent *bev)
|
|
{
|
|
ev_ssize_t r;
|
|
struct bufferevent_private *bevp;
|
|
BEV_LOCK(bev);
|
|
bevp = BEV_UPCAST(bev);
|
|
if (bevp->rate_limiting && bevp->rate_limiting->cfg) {
|
|
bufferevent_update_buckets(bevp);
|
|
r = bevp->rate_limiting->limit.read_limit;
|
|
} else {
|
|
r = EV_SSIZE_MAX;
|
|
}
|
|
BEV_UNLOCK(bev);
|
|
return r;
|
|
}
|
|
|
|
/* Mostly you don't want to use this function from inside libevent;
|
|
* bufferevent_get_write_max_() is more likely what you want*/
|
|
ev_ssize_t
|
|
bufferevent_get_write_limit(struct bufferevent *bev)
|
|
{
|
|
ev_ssize_t r;
|
|
struct bufferevent_private *bevp;
|
|
BEV_LOCK(bev);
|
|
bevp = BEV_UPCAST(bev);
|
|
if (bevp->rate_limiting && bevp->rate_limiting->cfg) {
|
|
bufferevent_update_buckets(bevp);
|
|
r = bevp->rate_limiting->limit.write_limit;
|
|
} else {
|
|
r = EV_SSIZE_MAX;
|
|
}
|
|
BEV_UNLOCK(bev);
|
|
return r;
|
|
}
|
|
|
|
int
|
|
bufferevent_set_max_single_read(struct bufferevent *bev, size_t size)
|
|
{
|
|
struct bufferevent_private *bevp;
|
|
int ret = 0;
|
|
BEV_LOCK(bev);
|
|
bevp = BEV_UPCAST(bev);
|
|
if (size == 0 || size > EV_SSIZE_MAX)
|
|
bevp->max_single_read = MAX_SINGLE_READ_DEFAULT;
|
|
else
|
|
bevp->max_single_read = size;
|
|
ret = evbuffer_set_max_read(bev->input, bevp->max_single_read);
|
|
BEV_UNLOCK(bev);
|
|
return ret;
|
|
}
|
|
|
|
int
|
|
bufferevent_set_max_single_write(struct bufferevent *bev, size_t size)
|
|
{
|
|
struct bufferevent_private *bevp;
|
|
BEV_LOCK(bev);
|
|
bevp = BEV_UPCAST(bev);
|
|
if (size == 0 || size > EV_SSIZE_MAX)
|
|
bevp->max_single_write = MAX_SINGLE_WRITE_DEFAULT;
|
|
else
|
|
bevp->max_single_write = size;
|
|
BEV_UNLOCK(bev);
|
|
return 0;
|
|
}
|
|
|
|
ev_ssize_t
|
|
bufferevent_get_max_single_read(struct bufferevent *bev)
|
|
{
|
|
ev_ssize_t r;
|
|
|
|
BEV_LOCK(bev);
|
|
r = BEV_UPCAST(bev)->max_single_read;
|
|
BEV_UNLOCK(bev);
|
|
return r;
|
|
}
|
|
|
|
ev_ssize_t
|
|
bufferevent_get_max_single_write(struct bufferevent *bev)
|
|
{
|
|
ev_ssize_t r;
|
|
|
|
BEV_LOCK(bev);
|
|
r = BEV_UPCAST(bev)->max_single_write;
|
|
BEV_UNLOCK(bev);
|
|
return r;
|
|
}
|
|
|
|
ev_ssize_t
|
|
bufferevent_get_max_to_read(struct bufferevent *bev)
|
|
{
|
|
ev_ssize_t r;
|
|
BEV_LOCK(bev);
|
|
r = bufferevent_get_read_max_(BEV_UPCAST(bev));
|
|
BEV_UNLOCK(bev);
|
|
return r;
|
|
}
|
|
|
|
ev_ssize_t
|
|
bufferevent_get_max_to_write(struct bufferevent *bev)
|
|
{
|
|
ev_ssize_t r;
|
|
BEV_LOCK(bev);
|
|
r = bufferevent_get_write_max_(BEV_UPCAST(bev));
|
|
BEV_UNLOCK(bev);
|
|
return r;
|
|
}
|
|
|
|
const struct ev_token_bucket_cfg *
|
|
bufferevent_get_token_bucket_cfg(const struct bufferevent *bev) {
|
|
struct bufferevent_private *bufev_private = BEV_UPCAST(bev);
|
|
struct ev_token_bucket_cfg *cfg;
|
|
|
|
BEV_LOCK(bev);
|
|
|
|
if (bufev_private->rate_limiting) {
|
|
cfg = bufev_private->rate_limiting->cfg;
|
|
} else {
|
|
cfg = NULL;
|
|
}
|
|
|
|
BEV_UNLOCK(bev);
|
|
|
|
return cfg;
|
|
}
|
|
|
|
/* Mostly you don't want to use this function from inside libevent;
|
|
* bufferevent_get_read_max_() is more likely what you want*/
|
|
ev_ssize_t
|
|
bufferevent_rate_limit_group_get_read_limit(
|
|
struct bufferevent_rate_limit_group *grp)
|
|
{
|
|
ev_ssize_t r;
|
|
LOCK_GROUP(grp);
|
|
r = grp->rate_limit.read_limit;
|
|
UNLOCK_GROUP(grp);
|
|
return r;
|
|
}
|
|
|
|
/* Mostly you don't want to use this function from inside libevent;
|
|
* bufferevent_get_write_max_() is more likely what you want. */
|
|
ev_ssize_t
|
|
bufferevent_rate_limit_group_get_write_limit(
|
|
struct bufferevent_rate_limit_group *grp)
|
|
{
|
|
ev_ssize_t r;
|
|
LOCK_GROUP(grp);
|
|
r = grp->rate_limit.write_limit;
|
|
UNLOCK_GROUP(grp);
|
|
return r;
|
|
}
|
|
|
|
int
|
|
bufferevent_decrement_read_limit(struct bufferevent *bev, ev_ssize_t decr)
|
|
{
|
|
int r = 0;
|
|
ev_ssize_t old_limit, new_limit;
|
|
struct bufferevent_private *bevp;
|
|
BEV_LOCK(bev);
|
|
bevp = BEV_UPCAST(bev);
|
|
EVUTIL_ASSERT(bevp->rate_limiting && bevp->rate_limiting->cfg);
|
|
old_limit = bevp->rate_limiting->limit.read_limit;
|
|
|
|
new_limit = (bevp->rate_limiting->limit.read_limit -= decr);
|
|
if (old_limit > 0 && new_limit <= 0) {
|
|
bufferevent_suspend_read_(bev, BEV_SUSPEND_BW);
|
|
if (event_add(&bevp->rate_limiting->refill_bucket_event,
|
|
&bevp->rate_limiting->cfg->tick_timeout) < 0)
|
|
r = -1;
|
|
} else if (old_limit <= 0 && new_limit > 0) {
|
|
if (!(bevp->write_suspended & BEV_SUSPEND_BW))
|
|
event_del(&bevp->rate_limiting->refill_bucket_event);
|
|
bufferevent_unsuspend_read_(bev, BEV_SUSPEND_BW);
|
|
}
|
|
|
|
BEV_UNLOCK(bev);
|
|
return r;
|
|
}
|
|
|
|
int
|
|
bufferevent_decrement_write_limit(struct bufferevent *bev, ev_ssize_t decr)
|
|
{
|
|
/* XXXX this is mostly copy-and-paste from
|
|
* bufferevent_decrement_read_limit */
|
|
int r = 0;
|
|
ev_ssize_t old_limit, new_limit;
|
|
struct bufferevent_private *bevp;
|
|
BEV_LOCK(bev);
|
|
bevp = BEV_UPCAST(bev);
|
|
EVUTIL_ASSERT(bevp->rate_limiting && bevp->rate_limiting->cfg);
|
|
old_limit = bevp->rate_limiting->limit.write_limit;
|
|
|
|
new_limit = (bevp->rate_limiting->limit.write_limit -= decr);
|
|
if (old_limit > 0 && new_limit <= 0) {
|
|
bufferevent_suspend_write_(bev, BEV_SUSPEND_BW);
|
|
if (event_add(&bevp->rate_limiting->refill_bucket_event,
|
|
&bevp->rate_limiting->cfg->tick_timeout) < 0)
|
|
r = -1;
|
|
} else if (old_limit <= 0 && new_limit > 0) {
|
|
if (!(bevp->read_suspended & BEV_SUSPEND_BW))
|
|
event_del(&bevp->rate_limiting->refill_bucket_event);
|
|
bufferevent_unsuspend_write_(bev, BEV_SUSPEND_BW);
|
|
}
|
|
|
|
BEV_UNLOCK(bev);
|
|
return r;
|
|
}
|
|
|
|
int
|
|
bufferevent_rate_limit_group_decrement_read(
|
|
struct bufferevent_rate_limit_group *grp, ev_ssize_t decr)
|
|
{
|
|
int r = 0;
|
|
ev_ssize_t old_limit, new_limit;
|
|
LOCK_GROUP(grp);
|
|
old_limit = grp->rate_limit.read_limit;
|
|
new_limit = (grp->rate_limit.read_limit -= decr);
|
|
|
|
if (old_limit > 0 && new_limit <= 0) {
|
|
bev_group_suspend_reading_(grp);
|
|
} else if (old_limit <= 0 && new_limit > 0) {
|
|
bev_group_unsuspend_reading_(grp);
|
|
}
|
|
|
|
UNLOCK_GROUP(grp);
|
|
return r;
|
|
}
|
|
|
|
int
|
|
bufferevent_rate_limit_group_decrement_write(
|
|
struct bufferevent_rate_limit_group *grp, ev_ssize_t decr)
|
|
{
|
|
int r = 0;
|
|
ev_ssize_t old_limit, new_limit;
|
|
LOCK_GROUP(grp);
|
|
old_limit = grp->rate_limit.write_limit;
|
|
new_limit = (grp->rate_limit.write_limit -= decr);
|
|
|
|
if (old_limit > 0 && new_limit <= 0) {
|
|
bev_group_suspend_writing_(grp);
|
|
} else if (old_limit <= 0 && new_limit > 0) {
|
|
bev_group_unsuspend_writing_(grp);
|
|
}
|
|
|
|
UNLOCK_GROUP(grp);
|
|
return r;
|
|
}
|
|
|
|
void
|
|
bufferevent_rate_limit_group_get_totals(struct bufferevent_rate_limit_group *grp,
|
|
ev_uint64_t *total_read_out, ev_uint64_t *total_written_out)
|
|
{
|
|
EVUTIL_ASSERT(grp != NULL);
|
|
if (total_read_out)
|
|
*total_read_out = grp->total_read;
|
|
if (total_written_out)
|
|
*total_written_out = grp->total_written;
|
|
}
|
|
|
|
void
|
|
bufferevent_rate_limit_group_reset_totals(struct bufferevent_rate_limit_group *grp)
|
|
{
|
|
grp->total_read = grp->total_written = 0;
|
|
}
|
|
|
|
int
|
|
bufferevent_ratelim_init_(struct bufferevent_private *bev)
|
|
{
|
|
bev->rate_limiting = NULL;
|
|
bev->max_single_read = MAX_SINGLE_READ_DEFAULT;
|
|
bev->max_single_write = MAX_SINGLE_WRITE_DEFAULT;
|
|
|
|
if (evbuffer_set_max_read(bev->bev.input, bev->max_single_read))
|
|
return -1;
|
|
|
|
return 0;
|
|
}
|