bufferevent_filter: Merge branch 'be-filter-data-stuck'

Fixes data stuck in filters with active watermarks.

* be-filter-data-stuck:
  test/be_filter: creating test case for data stuck with active watermarks
  be_filter: avoid data stuck under active watermarks
This commit is contained in:
Azat Khuzhin 2016-06-19 13:21:15 +03:00
commit 9b294082fb
2 changed files with 176 additions and 6 deletions

View File

@ -71,6 +71,9 @@ static int be_filter_flush(struct bufferevent *bufev,
short iotype, enum bufferevent_flush_mode mode);
static int be_filter_ctrl(struct bufferevent *, enum bufferevent_ctrl_op, union bufferevent_ctrl_data *);
static void bufferevent_filtered_inbuf_cb(struct evbuffer *buf,
const struct evbuffer_cb_info *cbinfo, void *arg);
static void bufferevent_filtered_outbuf_cb(struct evbuffer *buf,
const struct evbuffer_cb_info *info, void *arg);
@ -79,6 +82,8 @@ struct bufferevent_filtered {
/** The bufferevent that we read/write filtered data from/to. */
struct bufferevent *underlying;
/** A callback on our inbuf to notice somebory removes data */
struct evbuffer_cb_entry *inbuf_cb;
/** A callback on our outbuf to notice when somebody adds data */
struct evbuffer_cb_entry *outbuf_cb;
/** True iff we have received an EOF callback from the underlying
@ -203,6 +208,11 @@ bufferevent_filter_new(struct bufferevent *underlying,
bufferevent_setcb(bufev_f->underlying,
be_filter_readcb, be_filter_writecb, be_filter_eventcb, bufev_f);
bufev_f->inbuf_cb = evbuffer_add_cb(downcast(bufev_f)->input,
bufferevent_filtered_inbuf_cb, bufev_f);
evbuffer_cb_clear_flags(downcast(bufev_f)->input, bufev_f->inbuf_cb,
EVBUFFER_CB_ENABLED);
bufev_f->outbuf_cb = evbuffer_add_cb(downcast(bufev_f)->output,
bufferevent_filtered_outbuf_cb, bufev_f);
@ -251,6 +261,12 @@ be_filter_destruct(struct bufferevent *bev)
EVUTIL_ASSERT(bevf);
if (bevf->free_context)
bevf->free_context(bevf->context);
if (bevf->inbuf_cb)
evbuffer_remove_cb_entry(bev->input, bevf->inbuf_cb);
if (bevf->outbuf_cb)
evbuffer_remove_cb_entry(bev->output, bevf->outbuf_cb);
}
static int
@ -418,9 +434,8 @@ bufferevent_filtered_outbuf_cb(struct evbuffer *buf,
}
}
/* Called when the underlying socket has read. */
static void
be_filter_readcb(struct bufferevent *underlying, void *me_)
be_filter_read_nolock_(struct bufferevent *underlying, void *me_)
{
struct bufferevent_filtered *bevf = me_;
enum bufferevent_filter_result res;
@ -429,8 +444,6 @@ be_filter_readcb(struct bufferevent *underlying, void *me_)
struct bufferevent_private *bufev_private = BEV_UPCAST(bufev);
int processed_any = 0;
BEV_LOCK(bufev);
// It's possible our refcount is 0 at this point if another thread free'd our filterevent
EVUTIL_ASSERT(bufev_private->refcnt >= 0);
@ -449,11 +462,65 @@ be_filter_readcb(struct bufferevent *underlying, void *me_)
/* XXX This should be in process_input, not here. There are
* other places that can call process-input, and they should
* force readcb calls as needed. */
if (processed_any)
if (processed_any) {
bufferevent_trigger_nolock_(bufev, EV_READ, 0);
if (evbuffer_get_length(underlying->input) > 0 &&
be_readbuf_full(bevf, state)) {
/* data left in underlying buffer and filter input buffer
* hit its read high watermark.
* Schedule callback to avoid data gets stuck in underlying
* input buffer.
*/
evbuffer_cb_set_flags(bufev->input, bevf->inbuf_cb,
EVBUFFER_CB_ENABLED);
}
}
}
}
/* Called when the size of our inbuf changes. */
static void
bufferevent_filtered_inbuf_cb(struct evbuffer *buf,
const struct evbuffer_cb_info *cbinfo, void *arg)
{
struct bufferevent_filtered *bevf = arg;
enum bufferevent_flush_mode state;
struct bufferevent *bev = downcast(bevf);
BEV_LOCK(bev);
if (bevf->got_eof)
state = BEV_FINISHED;
else
state = BEV_NORMAL;
if (!be_readbuf_full(bevf, state)) {
/* opportunity to read data which was left in underlying
* input buffer because filter input buffer hit read
* high watermark.
*/
evbuffer_cb_clear_flags(bev->input, bevf->inbuf_cb,
EVBUFFER_CB_ENABLED);
if (evbuffer_get_length(bevf->underlying->input) > 0)
be_filter_read_nolock_(bevf->underlying, bevf);
}
BEV_UNLOCK(bufev);
BEV_UNLOCK(bev);
}
/* Called when the underlying socket has read. */
static void
be_filter_readcb(struct bufferevent *underlying, void *me_)
{
struct bufferevent_filtered *bevf = me_;
struct bufferevent *bev = downcast(bevf);
BEV_LOCK(bev);
be_filter_read_nolock_(underlying, me_);
BEV_UNLOCK(bev);
}
/* Called when the underlying socket has drained enough that we can write to

View File

@ -1227,6 +1227,106 @@ end:
bufferevent_free(bev2);
}
struct bufferevent_filter_data_stuck {
size_t header_size;
size_t total_read;
};
static void
bufferevent_filter_data_stuck_readcb(struct bufferevent *bev, void *arg)
{
struct bufferevent_filter_data_stuck *filter_data = arg;
struct evbuffer *input = bufferevent_get_input(bev);
size_t read_size = evbuffer_get_length(input);
evbuffer_drain(input, read_size);
filter_data->total_read += read_size;
}
/**
* This filter prepends header once before forwarding data.
*/
static enum bufferevent_filter_result
bufferevent_filter_data_stuck_inputcb(
struct evbuffer *src, struct evbuffer *dst, ev_ssize_t dst_limit,
enum bufferevent_flush_mode mode, void *ctx)
{
struct bufferevent_filter_data_stuck *filter_data = ctx;
static int header_inserted = 0;
size_t payload_size;
size_t header_size = 0;
if (!header_inserted) {
char *header = calloc(filter_data->header_size, 1);
evbuffer_add(dst, header, filter_data->header_size);
free(header);
header_size = filter_data->header_size;
header_inserted = 1;
}
payload_size = evbuffer_get_length(src);
if (payload_size > dst_limit - header_size) {
payload_size = dst_limit - header_size;
}
tt_int_op(payload_size, ==, evbuffer_remove_buffer(src, dst, payload_size));
end:
return BEV_OK;
}
static void
test_bufferevent_filter_data_stuck(void *arg)
{
const size_t read_high_wm = 4096;
struct bufferevent_filter_data_stuck filter_data;
struct basic_test_data *data = arg;
struct bufferevent *pair[2];
struct bufferevent *filter = NULL;
int options = BEV_OPT_CLOSE_ON_FREE | BEV_OPT_DEFER_CALLBACKS;
char payload[4096];
int payload_size = sizeof(payload);
memset(&filter_data, 0, sizeof(filter_data));
filter_data.header_size = 20;
tt_assert(bufferevent_pair_new(data->base, options, pair) == 0);
bufferevent_setwatermark(pair[0], EV_READ, 0, read_high_wm);
bufferevent_setwatermark(pair[1], EV_READ, 0, read_high_wm);
tt_assert(
filter =
bufferevent_filter_new(pair[1],
bufferevent_filter_data_stuck_inputcb,
NULL,
options,
NULL,
&filter_data));
bufferevent_setcb(filter,
bufferevent_filter_data_stuck_readcb,
NULL,
NULL,
&filter_data);
tt_assert(bufferevent_enable(filter, EV_READ|EV_WRITE) == 0);
bufferevent_setwatermark(filter, EV_READ, 0, read_high_wm);
tt_assert(bufferevent_write(pair[0], payload, sizeof(payload)) == 0);
event_base_dispatch(data->base);
tt_int_op(filter_data.total_read, ==, payload_size + filter_data.header_size);
end:
if (pair[0])
bufferevent_free(pair[0]);
if (filter)
bufferevent_free(filter);
}
struct testcase_t bufferevent_testcases[] = {
LEGACY(bufferevent, TT_ISOLATED),
@ -1299,6 +1399,9 @@ struct testcase_t bufferevent_testcases[] = {
{ "bufferevent_pair_flush",
test_bufferevent_pair_flush,
TT_FORK|TT_NEED_BASE, &basic_setup, NULL },
{ "bufferevent_filter_data_stuck",
test_bufferevent_filter_data_stuck,
TT_FORK|TT_NEED_BASE, &basic_setup, NULL },
END_OF_TESTCASES,
};