Provide consistent, tested semantics for bufferevent timeouts

The different bufferevent implementations had different behavior for
their timeouts.  Some of them kept re-triggering the timeouts
indefinitely; some disabled the event immediately the first time a
timeout triggered.  Some of them made the timeouts only count when
the bufferevent was actively trying to read or write; some did not.

The new behavior is modeled after old socket bufferevents, since
they were here first and their behavior is relatively sane.
Basically, each timeout disables the bufferevent's corresponding
read or write operation when it fires.  Timeouts are stopped
whenever we suspend writing or reading, and reset whenever we
unsuspend writing or reading.  Calling bufferevent_enable resets a
timeout, as does changing the timeout value.
This commit is contained in:
Nick Mathewson 2010-02-20 18:44:35 -05:00
parent 38ec0a773b
commit d3288293fd
9 changed files with 232 additions and 21 deletions

View File

@ -328,6 +328,11 @@ int _bufferevent_generic_adj_timeouts(struct bufferevent *bev);
if (evutil_timerisset(&(bev)->timeout_write)) \
event_add(&(bev)->ev_write, &(bev)->timeout_write); \
} while (0)
#define BEV_DEL_GENERIC_READ_TIMEOUT(bev) \
event_del(&(bev)->ev_read)
#define BEV_DEL_GENERIC_WRITE_TIMEOUT(bev) \
event_del(&(bev)->ev_write)
/** Internal: Given a bufferevent, return its corresponding
* bufferevent_private. */

View File

@ -676,6 +676,7 @@ bufferevent_generic_read_timeout_cb(evutil_socket_t fd, short event, void *ctx)
{
struct bufferevent *bev = ctx;
_bufferevent_incref_and_lock(bev);
bufferevent_disable(bev, EV_READ);
_bufferevent_run_eventcb(bev, BEV_EVENT_TIMEOUT|BEV_EVENT_READING);
_bufferevent_decref_and_unlock(bev);
}
@ -684,6 +685,7 @@ bufferevent_generic_write_timeout_cb(evutil_socket_t fd, short event, void *ctx)
{
struct bufferevent *bev = ctx;
_bufferevent_incref_and_lock(bev);
bufferevent_disable(bev, EV_WRITE);
_bufferevent_run_eventcb(bev, BEV_EVENT_TIMEOUT|BEV_EVENT_WRITING);
_bufferevent_decref_and_unlock(bev);
}
@ -712,13 +714,18 @@ int
_bufferevent_generic_adj_timeouts(struct bufferevent *bev)
{
const short enabled = bev->enabled;
struct bufferevent_private *bev_p =
EVUTIL_UPCAST(bev, struct bufferevent_private, bev);
int r1=0, r2=0;
if ((enabled & EV_READ) && evutil_timerisset(&bev->timeout_read))
if ((enabled & EV_READ) && !bev_p->read_suspended &&
evutil_timerisset(&bev->timeout_read))
r1 = event_add(&bev->ev_read, &bev->timeout_read);
else
r1 = event_del(&bev->ev_read);
if ((enabled & EV_WRITE) && evutil_timerisset(&bev->timeout_write))
if ((enabled & EV_WRITE) && !bev_p->write_suspended &&
evutil_timerisset(&bev->timeout_write) &&
evbuffer_get_length(bev->output))
r2 = event_add(&bev->ev_write, &bev->timeout_write);
else
r2 = event_del(&bev->ev_write);

View File

@ -242,8 +242,10 @@ be_async_enable(struct bufferevent *buf, short what)
return -1;
/* NOTE: This interferes with non-blocking connect */
if (_bufferevent_generic_adj_timeouts(buf) < 0)
return -1;
if (event & EV_READ)
BEV_RESET_GENERIC_READ_TIMEOUT(bev);
if (event & EV_WRITE)
BEV_RESET_GENERIC_WRITE_TIMEOUT(bev);
/* If we newly enable reading or writing, and we aren't reading or
writing already, consider launching a new read or write. */
@ -262,7 +264,10 @@ be_async_disable(struct bufferevent *bev, short what)
* canceling any in-progress read or write operation, though it might
* not work. */
_bufferevent_generic_adj_timeouts(bev);
if (event & EV_READ)
BEV_DEL_GENERIC_READ_TIMEOUT(bev);
if (event & EV_WRITE)
BEV_DEL_GENERIC_WRITE_TIMEOUT(bev);
return 0;
}

View File

@ -222,7 +222,10 @@ static int
be_filter_enable(struct bufferevent *bev, short event)
{
struct bufferevent_filtered *bevf = upcast(bev);
_bufferevent_generic_adj_timeouts(bev);
if (event & EV_READ)
BEV_RESET_GENERIC_READ_TIMEOUT(bev);
if (event & EV_WRITE)
BEV_RESET_GENERIC_WRITE_TIMEOUT(bev);
return bufferevent_enable(bevf->underlying, event);
}
@ -230,7 +233,10 @@ static int
be_filter_disable(struct bufferevent *bev, short event)
{
struct bufferevent_filtered *bevf = upcast(bev);
_bufferevent_generic_adj_timeouts(bev);
if (event & EV_READ)
BEV_DEL_GENERIC_READ_TIMEOUT(bev);
if (event & EV_WRITE)
BEV_DEL_GENERIC_WRITE_TIMEOUT(bev);
return bufferevent_disable(bevf->underlying, event);
}

View File

@ -982,7 +982,10 @@ be_openssl_enable(struct bufferevent *bev, short events)
r2 = start_writing(bev_ssl);
if (bev_ssl->underlying) {
_bufferevent_generic_adj_timeouts(bev);
if (events & EV_READ)
BEV_RESET_GENERIC_READ_TIMEOUT(bev);
if (events & EV_WRITE)
BEV_RESET_GENERIC_WRITE_TIMEOUT(bev);
if (events & EV_READ)
consider_reading(bev_ssl);
@ -1004,8 +1007,12 @@ be_openssl_disable(struct bufferevent *bev, short events)
if (events & EV_WRITE)
stop_writing(bev_ssl);
if (bev_ssl->underlying)
_bufferevent_generic_adj_timeouts(bev);
if (bev_ssl->underlying) {
if (events & EV_READ)
BEV_DEL_GENERIC_READ_TIMEOUT(bev);
if (events & EV_WRITE)
BEV_DEL_GENERIC_WRITE_TIMEOUT(bev);
}
return 0;
}

View File

@ -165,14 +165,22 @@ be_pair_transfer(struct bufferevent *src, struct bufferevent *dst,
} else {
if (!ignore_wm)
goto done;
n = evbuffer_get_length(src->output);
evbuffer_add_buffer(dst->input, src->output);
}
} else {
n = evbuffer_get_length(src->output);
evbuffer_add_buffer(dst->input, src->output);
}
BEV_RESET_GENERIC_READ_TIMEOUT(dst);
BEV_RESET_GENERIC_WRITE_TIMEOUT(dst);
if (n) {
BEV_RESET_GENERIC_READ_TIMEOUT(dst);
if (evbuffer_get_length(dst->output))
BEV_RESET_GENERIC_WRITE_TIMEOUT(dst);
else
BEV_DEL_GENERIC_WRITE_TIMEOUT(dst);
}
src_size = evbuffer_get_length(src->output);
dst_size = evbuffer_get_length(dst->input);
@ -226,8 +234,11 @@ be_pair_enable(struct bufferevent *bufev, short events)
incref_and_lock(bufev);
if (_bufferevent_generic_adj_timeouts(bufev) < 0)
return -1;
if (events & EV_READ) {
BEV_RESET_GENERIC_READ_TIMEOUT(bufev);
}
if ((events & EV_WRITE) && evbuffer_get_length(bufev->output))
BEV_RESET_GENERIC_WRITE_TIMEOUT(bufev);
/* We're starting to read! Does the other side have anything to write?*/
if ((events & EV_READ) && partner &&
@ -246,7 +257,12 @@ be_pair_enable(struct bufferevent *bufev, short events)
static int
be_pair_disable(struct bufferevent *bev, short events)
{
return _bufferevent_generic_adj_timeouts(bev);
if (events & EV_READ) {
BEV_DEL_GENERIC_READ_TIMEOUT(bev);
}
if (events & EV_WRITE)
BEV_DEL_GENERIC_WRITE_TIMEOUT(bev);
return 0;
}
static void

View File

@ -104,10 +104,13 @@ bufferevent_socket_outbuf_cb(struct evbuffer *buf,
void *arg)
{
struct bufferevent *bufev = arg;
struct bufferevent_private *bufev_p =
EVUTIL_UPCAST(bufev, struct bufferevent_private, bev);
if (cbinfo->n_added &&
(bufev->enabled & EV_WRITE) &&
!event_pending(&bufev->ev_write, EV_WRITE, NULL)) {
!event_pending(&bufev->ev_write, EV_WRITE, NULL) &&
!bufev_p->write_suspended) {
/* Somebody added data to the buffer, and we would like to
* write, and we were not writing. So, start writing. */
be_socket_add(&bufev->ev_write, &bufev->timeout_write);
@ -184,7 +187,7 @@ bufferevent_readcb(evutil_socket_t fd, short event, void *arg)
goto done;
error:
event_del(&bufev->ev_read);
bufferevent_disable(bufev, EV_READ);
_bufferevent_run_eventcb(bufev, what);
done:
@ -268,8 +271,9 @@ bufferevent_writecb(evutil_socket_t fd, short event, void *arg)
_bufferevent_decrement_write_buckets(bufev_p, res);
}
if (evbuffer_get_length(bufev->output) == 0)
if (evbuffer_get_length(bufev->output) == 0) {
event_del(&bufev->ev_write);
}
/*
* Invoke the user callback if our buffer is drained or below the
@ -283,12 +287,13 @@ bufferevent_writecb(evutil_socket_t fd, short event, void *arg)
goto done;
reschedule:
if (evbuffer_get_length(bufev->output) == 0)
if (evbuffer_get_length(bufev->output) == 0) {
event_del(&bufev->ev_write);
}
goto done;
error:
event_del(&bufev->ev_write);
bufferevent_disable(bufev, EV_WRITE);
_bufferevent_run_eventcb(bufev, what);
done:
@ -553,9 +558,10 @@ be_socket_adj_timeouts(struct bufferevent *bufev)
if (event_pending(&bufev->ev_read, EV_READ, NULL))
if (be_socket_add(&bufev->ev_read, &bufev->timeout_read) < 0)
r = -1;
if (event_pending(&bufev->ev_write, EV_WRITE, NULL))
if (event_pending(&bufev->ev_write, EV_WRITE, NULL)) {
if (be_socket_add(&bufev->ev_write, &bufev->timeout_write) < 0)
r = -1;
}
return r;
}

View File

@ -360,6 +360,25 @@ short bufferevent_get_enabled(struct bufferevent *bufev);
/**
Set the read and write timeout for a buffered event.
A bufferevent's timeout will fire the first time that the indicated
amount of time has elapsed since a successful read or write operation,
during which the bufferevent was trying to read or write.
(In other words, if reading or writing is disabled, or if the
bufferevent's read or write operation has been suspended because
there's no data to write, or not enough banwidth, or so on, the
timeout isn't active. The timeout only becomes active when we we're
willing to actually read or write.)
Calling bufferevent_enable or setting a timeout for a bufferevent
whose timeout is already pending resets its timeout.
If the timeout elapses, the corresponding operation (EV_READ or
EV_WRITE) becomes disabled until you re-enable it again. The
bufferevent's event callback is called with the
BEV_EVENT_TIMEOUT|BEV_EVENT_READING or
BEV_EVENT_TIMEOUT|BEV_EVENT_WRITING.
@param bufev the bufferevent to be modified
@param timeout_read the read timeout, or NULL
@param timeout_write the write timeout, or NULL

View File

@ -613,6 +613,142 @@ end:
event_del(&close_listener_event);
}
struct timeout_cb_result {
struct timeval read_timeout_at;
struct timeval write_timeout_at;
struct timeval last_wrote_at;
int n_read_timeouts;
int n_write_timeouts;
int total_calls;
};
static long
msec_diff(const struct timeval *start, const struct timeval *end)
{
long ms = end->tv_sec - start->tv_sec;
ms *= 1000;
ms += ((end->tv_usec - start->tv_usec)+500) / 1000;
return ms;
}
static void
bev_timeout_write_cb(struct bufferevent *bev, void *arg)
{
struct timeout_cb_result *res = arg;
evutil_gettimeofday(&res->last_wrote_at, NULL);
}
static void
bev_timeout_event_cb(struct bufferevent *bev, short what, void *arg)
{
struct timeout_cb_result *res = arg;
++res->total_calls;
if ((what & (BEV_EVENT_READING|BEV_EVENT_TIMEOUT))
== (BEV_EVENT_READING|BEV_EVENT_TIMEOUT)) {
evutil_gettimeofday(&res->read_timeout_at, NULL);
++res->n_read_timeouts;
}
if ((what & (BEV_EVENT_WRITING|BEV_EVENT_TIMEOUT))
== (BEV_EVENT_WRITING|BEV_EVENT_TIMEOUT)) {
evutil_gettimeofday(&res->write_timeout_at, NULL);
++res->n_write_timeouts;
}
}
static void
test_bufferevent_timeouts(void *arg)
{
/* "arg" is a string containing "pair" and/or "nodata" */
struct bufferevent *bev1 = NULL, *bev2 = NULL;
struct basic_test_data *data = arg;
int use_pair = 0;
struct timeval tv_w, tv_r, started_at;
struct timeout_cb_result res1, res2;
char buf[1024];
memset(&res1, 0, sizeof(res1));
memset(&res2, 0, sizeof(res2));
if (strstr((char*)data->setup_data, "pair"))
use_pair = 1;
if (use_pair) {
struct bufferevent *p[2];
tt_int_op(0, ==, bufferevent_pair_new(data->base, 0, p));
bev1 = p[0];
bev2 = p[1];
} else {
bev1 = bufferevent_socket_new(data->base, data->pair[0], 0);
bev2 = bufferevent_socket_new(data->base, data->pair[1], 0);
}
/* Do this nice and early. */
bufferevent_disable(bev2, EV_READ);
/* bev1 will try to write and read. Both will time out. */
evutil_gettimeofday(&started_at, NULL);
tv_w.tv_sec = tv_r.tv_sec = 0;
tv_w.tv_usec = 100*1000;
tv_r.tv_usec = 150*1000;
bufferevent_setcb(bev1, NULL, bev_timeout_write_cb,
bev_timeout_event_cb, &res1);
bufferevent_setwatermark(bev1, EV_WRITE, 1024*1024+10, 0);
bufferevent_set_timeouts(bev1, &tv_r, &tv_w);
if (use_pair) {
/* For a pair, the fact that the other side isn't reading
* makes the writer stall */
bufferevent_write(bev1, "ABCDEFG", 7);
} else {
/* For a real socket, the kernel's TCP buffers can eat a
* fair number of bytes; make sure that at some point we
* have some bytes that will stall. */
struct evbuffer *output = bufferevent_get_output(bev1);
int i;
memset(buf, 0xbb, sizeof(buf));
for (i=0;i<1024;++i) {
evbuffer_add_reference(output, buf, sizeof(buf),
NULL, NULL);
}
}
bufferevent_enable(bev1, EV_READ|EV_WRITE);
/* bev2 has nothing to say, and isn't listening. */
bufferevent_setcb(bev2, NULL, bev_timeout_write_cb,
bev_timeout_event_cb, &res2);
tv_w.tv_sec = tv_r.tv_sec = 0;
tv_w.tv_usec = 200*1000;
tv_r.tv_usec = 100*1000;
bufferevent_set_timeouts(bev2, &tv_r, &tv_w);
bufferevent_enable(bev2, EV_WRITE);
tv_r.tv_sec = 1;
tv_r.tv_usec = 0;
event_base_loopexit(data->base, &tv_r);
event_base_dispatch(data->base);
/* XXXX Test that actually reading or writing a little resets the
* timeouts. */
/* Each buf1 timeout happens, and happens only once. */
tt_want(res1.n_read_timeouts);
tt_want(res1.n_write_timeouts);
tt_want(res1.n_read_timeouts == 1);
tt_want(res1.n_write_timeouts == 1);
tt_int_op(abs(msec_diff(&started_at, &res1.read_timeout_at)-150),
<=, 40);
tt_int_op(abs(msec_diff(&started_at, &res1.write_timeout_at)-100),
<=, 30);
end:
if (bev1)
bufferevent_free(bev1);
if (bev2)
bufferevent_free(bev2);
}
struct testcase_t bufferevent_testcases[] = {
LEGACY(bufferevent, TT_ISOLATED),
@ -632,6 +768,10 @@ struct testcase_t bufferevent_testcases[] = {
(void*)"defer lock" },
{ "bufferevent_connect_fail", test_bufferevent_connect_fail,
TT_FORK|TT_NEED_BASE, &basic_setup, NULL },
{ "bufferevent_timeouts", test_bufferevent_timeouts,
TT_FORK|TT_NEED_BASE|TT_NEED_SOCKETPAIR, &basic_setup, (void*)"" },
{ "bufferevent_pair_timeouts", test_bufferevent_timeouts,
TT_FORK|TT_NEED_BASE, &basic_setup, (void*)"pair" },
#ifdef _EVENT_HAVE_LIBZ
LEGACY(bufferevent_zlib, TT_ISOLATED),
#else