mirror of
https://github.com/libevent/libevent.git
synced 2025-01-09 00:56:20 +08:00
Merge branch 'be-wm-overrun-v2'
* be-wm-overrun-v2: Fix hangs due to watermarks overruns in bufferevents implementations test: cover watermarks (with some corner cases) in ssl bufferevent Fixes: #690
This commit is contained in:
commit
878bb2d3b9
@ -111,6 +111,28 @@ bufferevent_unsuspend_write_(struct bufferevent *bufev, bufferevent_suspend_flag
|
||||
BEV_UNLOCK(bufev);
|
||||
}
|
||||
|
||||
/**
|
||||
* Sometimes bufferevent's implementation can overrun high watermarks
|
||||
* (one of examples is openssl) and in this case if the read callback
|
||||
* will not handle enough data do over condition above the read
|
||||
* callback will never be called again (due to suspend above).
|
||||
*
|
||||
* To avoid this we are scheduling read callback again here, but only
|
||||
* from the user callback to avoid multiple scheduling:
|
||||
* - when the data had been added to it
|
||||
* - when the data had been drained from it (user specified read callback)
|
||||
*/
|
||||
static void bufferevent_inbuf_wm_check(struct bufferevent *bev)
|
||||
{
|
||||
if (!bev->wm_read.high)
|
||||
return;
|
||||
if (!(bev->enabled & EV_READ))
|
||||
return;
|
||||
if (evbuffer_get_length(bev->input) < bev->wm_read.high)
|
||||
return;
|
||||
|
||||
bufferevent_trigger(bev, EV_READ, BEV_OPT_DEFER_CALLBACKS);
|
||||
}
|
||||
|
||||
/* Callback to implement watermarks on the input buffer. Only enabled
|
||||
* if the watermark is set. */
|
||||
@ -147,6 +169,7 @@ bufferevent_run_deferred_callbacks_locked(struct event_callback *cb, void *arg)
|
||||
if (bufev_private->readcb_pending && bufev->readcb) {
|
||||
bufev_private->readcb_pending = 0;
|
||||
bufev->readcb(bufev, bufev->cbarg);
|
||||
bufferevent_inbuf_wm_check(bufev);
|
||||
}
|
||||
if (bufev_private->writecb_pending && bufev->writecb) {
|
||||
bufev_private->writecb_pending = 0;
|
||||
@ -187,6 +210,7 @@ bufferevent_run_deferred_callbacks_unlocked(struct event_callback *cb, void *arg
|
||||
void *cbarg = bufev->cbarg;
|
||||
bufev_private->readcb_pending = 0;
|
||||
UNLOCKED(readcb(bufev, cbarg));
|
||||
bufferevent_inbuf_wm_check(bufev);
|
||||
}
|
||||
if (bufev_private->writecb_pending && bufev->writecb) {
|
||||
bufferevent_data_cb writecb = bufev->writecb;
|
||||
@ -230,6 +254,7 @@ bufferevent_run_readcb_(struct bufferevent *bufev, int options)
|
||||
SCHEDULE_DEFERRED(p);
|
||||
} else {
|
||||
bufev->readcb(bufev, bufev->cbarg);
|
||||
bufferevent_inbuf_wm_check(bufev);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -518,6 +518,9 @@ int bufferevent_set_timeouts(struct bufferevent *bufev,
|
||||
On input, a bufferevent does not invoke the user read callback unless
|
||||
there is at least low watermark data in the buffer. If the read buffer
|
||||
is beyond the high watermark, the bufferevent stops reading from the network.
|
||||
But be aware that bufferevent input/read buffer can overrun high watermark
|
||||
limit (typical example is openssl bufferevent), so you should not relay in
|
||||
this.
|
||||
|
||||
On output, the user write callback is invoked whenever the buffered data
|
||||
falls below the low watermark. Filters that write to this bufev will try
|
||||
|
@ -733,6 +733,169 @@ end:
|
||||
;
|
||||
}
|
||||
|
||||
struct wm_context
|
||||
{
|
||||
int server;
|
||||
struct evbuffer *data;
|
||||
size_t to_read;
|
||||
size_t wm_high;
|
||||
size_t limit;
|
||||
size_t get;
|
||||
struct bufferevent *bev;
|
||||
};
|
||||
static void
|
||||
wm_transfer(struct bufferevent *bev, void *arg)
|
||||
{
|
||||
struct wm_context *ctx = arg;
|
||||
struct evbuffer *in = bufferevent_get_input(bev);
|
||||
struct evbuffer *out = bufferevent_get_output(bev);
|
||||
size_t len = evbuffer_get_length(in);
|
||||
size_t drain = len < ctx->to_read ? len : ctx->to_read;
|
||||
|
||||
evbuffer_drain(in, drain);
|
||||
ctx->get += drain;
|
||||
|
||||
TT_BLATHER(("wm_transfer-%s: in: %zu, out: %zu, got: %zu",
|
||||
ctx->server ? "server" : "client",
|
||||
evbuffer_get_length(in),
|
||||
evbuffer_get_length(out),
|
||||
ctx->get));
|
||||
|
||||
evbuffer_add_buffer_reference(out, ctx->data);
|
||||
if (ctx->get >= ctx->limit) {
|
||||
TT_BLATHER(("wm_transfer-%s: break",
|
||||
ctx->server ? "server" : "client"));
|
||||
bufferevent_setcb(bev, NULL, NULL, NULL, NULL);
|
||||
bufferevent_disable(bev, EV_READ);
|
||||
}
|
||||
}
|
||||
static void
|
||||
wm_eventcb(struct bufferevent *bev, short what, void *arg)
|
||||
{
|
||||
struct wm_context *ctx = arg;
|
||||
TT_BLATHER(("wm_eventcb-%s: %i",
|
||||
ctx->server ? "server" : "client", what));
|
||||
if (what & BEV_EVENT_CONNECTED) {
|
||||
} else {
|
||||
ctx->get = 0;
|
||||
}
|
||||
}
|
||||
static void
|
||||
wm_acceptcb(struct evconnlistener *listener, evutil_socket_t fd,
|
||||
struct sockaddr *addr, int socklen, void *arg)
|
||||
{
|
||||
struct wm_context *ctx = arg;
|
||||
struct bufferevent *bev;
|
||||
struct event_base *base = evconnlistener_get_base(listener);
|
||||
SSL *ssl = SSL_new(get_ssl_ctx());
|
||||
|
||||
SSL_use_certificate(ssl, ssl_getcert());
|
||||
SSL_use_PrivateKey(ssl, ssl_getkey());
|
||||
|
||||
bev = bufferevent_openssl_socket_new(
|
||||
base, fd, ssl, BUFFEREVENT_SSL_ACCEPTING, BEV_OPT_CLOSE_ON_FREE);
|
||||
|
||||
bufferevent_setwatermark(bev, EV_READ, 0, ctx->wm_high);
|
||||
bufferevent_setcb(bev, wm_transfer, NULL, wm_eventcb, ctx);
|
||||
bufferevent_enable(bev, EV_READ|EV_WRITE);
|
||||
ctx->bev = bev;
|
||||
|
||||
/* Only accept once, then disable ourself. */
|
||||
evconnlistener_disable(listener);
|
||||
}
|
||||
static void
|
||||
regress_bufferevent_openssl_wm(void *arg)
|
||||
{
|
||||
struct basic_test_data *data = arg;
|
||||
struct event_base *base = data->base;
|
||||
|
||||
struct evconnlistener *listener;
|
||||
struct bufferevent *bev;
|
||||
struct sockaddr_in sin;
|
||||
struct sockaddr_storage ss;
|
||||
enum regress_openssl_type type =
|
||||
(enum regress_openssl_type)data->setup_data;
|
||||
int bev_flags = BEV_OPT_CLOSE_ON_FREE;
|
||||
ev_socklen_t slen;
|
||||
SSL *ssl;
|
||||
struct wm_context client, server;
|
||||
char *payload;
|
||||
size_t payload_len = 1<<10;
|
||||
size_t wm_high = 5<<10;
|
||||
|
||||
init_ssl();
|
||||
|
||||
memset(&sin, 0, sizeof(sin));
|
||||
sin.sin_family = AF_INET;
|
||||
sin.sin_addr.s_addr = htonl(0x7f000001);
|
||||
|
||||
memset(&ss, 0, sizeof(ss));
|
||||
slen = sizeof(ss);
|
||||
|
||||
memset(&client, 0, sizeof(client));
|
||||
memset(&server, 0, sizeof(server));
|
||||
client.server = 0;
|
||||
server.server = 1;
|
||||
client.data = evbuffer_new();
|
||||
server.data = evbuffer_new();
|
||||
payload = calloc(1, payload_len);
|
||||
memset(payload, 'A', payload_len);
|
||||
evbuffer_add(server.data, payload, payload_len);
|
||||
evbuffer_add(client.data, payload, payload_len);
|
||||
client.wm_high = server.wm_high = wm_high;
|
||||
client.limit = server.limit = wm_high<<3;
|
||||
client.to_read = server.to_read = payload_len>>1;
|
||||
|
||||
TT_BLATHER(("openssl_wm: payload_len = %zu, wm_high = %zu, limit = %zu, to_read: %zu",
|
||||
payload_len, wm_high, server.limit, server.to_read));
|
||||
|
||||
listener = evconnlistener_new_bind(base, wm_acceptcb, &server,
|
||||
LEV_OPT_CLOSE_ON_FREE|LEV_OPT_REUSEABLE,
|
||||
-1, (struct sockaddr *)&sin, sizeof(sin));
|
||||
|
||||
tt_assert(listener);
|
||||
tt_assert(evconnlistener_get_fd(listener) >= 0);
|
||||
|
||||
ssl = SSL_new(get_ssl_ctx());
|
||||
tt_assert(ssl);
|
||||
|
||||
if (type & REGRESS_OPENSSL_FILTER) {
|
||||
bev = bufferevent_socket_new(data->base, -1, bev_flags);
|
||||
tt_assert(bev);
|
||||
bev = bufferevent_openssl_filter_new(
|
||||
base, bev, ssl, BUFFEREVENT_SSL_CONNECTING, bev_flags);
|
||||
} else {
|
||||
bev = bufferevent_openssl_socket_new(
|
||||
data->base, -1, ssl,
|
||||
BUFFEREVENT_SSL_CONNECTING,
|
||||
bev_flags);
|
||||
}
|
||||
tt_assert(bev);
|
||||
client.bev = bev;
|
||||
|
||||
bufferevent_setwatermark(bev, EV_READ, 0, client.wm_high);
|
||||
bufferevent_setcb(bev, wm_transfer, NULL, wm_eventcb, &client);
|
||||
|
||||
tt_assert(getsockname(evconnlistener_get_fd(listener),
|
||||
(struct sockaddr*)&ss, &slen) == 0);
|
||||
|
||||
tt_assert(!bufferevent_socket_connect(bev, (struct sockaddr*)&ss, slen));
|
||||
tt_assert(!evbuffer_add_buffer_reference(bufferevent_get_output(bev), client.data));
|
||||
tt_assert(!bufferevent_enable(bev, EV_READ|EV_WRITE));
|
||||
|
||||
event_base_dispatch(base);
|
||||
|
||||
tt_int_op(client.get, ==, client.limit);
|
||||
tt_int_op(server.get, ==, server.limit);
|
||||
end:
|
||||
free(payload);
|
||||
evbuffer_free(client.data);
|
||||
evbuffer_free(server.data);
|
||||
evconnlistener_free(listener);
|
||||
bufferevent_free(client.bev);
|
||||
bufferevent_free(server.bev);
|
||||
}
|
||||
|
||||
struct testcase_t ssl_testcases[] = {
|
||||
#define T(a) ((void *)(a))
|
||||
{ "bufferevent_socketpair", regress_bufferevent_openssl,
|
||||
@ -808,6 +971,11 @@ struct testcase_t ssl_testcases[] = {
|
||||
{ "bufferevent_connect_sleep", regress_bufferevent_openssl_connect,
|
||||
TT_FORK|TT_NEED_BASE, &basic_setup, T(REGRESS_OPENSSL_SLEEP) },
|
||||
|
||||
{ "bufferevent_wm", regress_bufferevent_openssl_wm,
|
||||
TT_FORK|TT_NEED_BASE, &basic_setup, NULL },
|
||||
{ "bufferevent_wm_filter", regress_bufferevent_openssl_wm,
|
||||
TT_FORK|TT_NEED_BASE, &basic_setup, T(REGRESS_OPENSSL_FILTER) },
|
||||
|
||||
#undef T
|
||||
|
||||
END_OF_TESTCASES,
|
||||
|
Loading…
x
Reference in New Issue
Block a user