diff --git a/bufferevent_filter.c b/bufferevent_filter.c index 5d5f992b..d47f9452 100644 --- a/bufferevent_filter.c +++ b/bufferevent_filter.c @@ -71,6 +71,9 @@ static int be_filter_flush(struct bufferevent *bufev, short iotype, enum bufferevent_flush_mode mode); static int be_filter_ctrl(struct bufferevent *, enum bufferevent_ctrl_op, union bufferevent_ctrl_data *); +static void bufferevent_filtered_inbuf_cb(struct evbuffer *buf, + const struct evbuffer_cb_info *cbinfo, void *arg); + static void bufferevent_filtered_outbuf_cb(struct evbuffer *buf, const struct evbuffer_cb_info *info, void *arg); @@ -79,6 +82,8 @@ struct bufferevent_filtered { /** The bufferevent that we read/write filtered data from/to. */ struct bufferevent *underlying; + /** A callback on our inbuf to notice somebory removes data */ + struct evbuffer_cb_entry *inbuf_cb; /** A callback on our outbuf to notice when somebody adds data */ struct evbuffer_cb_entry *outbuf_cb; /** True iff we have received an EOF callback from the underlying @@ -203,6 +208,11 @@ bufferevent_filter_new(struct bufferevent *underlying, bufferevent_setcb(bufev_f->underlying, be_filter_readcb, be_filter_writecb, be_filter_eventcb, bufev_f); + bufev_f->inbuf_cb = evbuffer_add_cb(downcast(bufev_f)->input, + bufferevent_filtered_inbuf_cb, bufev_f); + evbuffer_cb_clear_flags(downcast(bufev_f)->input, bufev_f->inbuf_cb, + EVBUFFER_CB_ENABLED); + bufev_f->outbuf_cb = evbuffer_add_cb(downcast(bufev_f)->output, bufferevent_filtered_outbuf_cb, bufev_f); @@ -251,6 +261,12 @@ be_filter_destruct(struct bufferevent *bev) EVUTIL_ASSERT(bevf); if (bevf->free_context) bevf->free_context(bevf->context); + + if (bevf->inbuf_cb) + evbuffer_remove_cb_entry(bev->input, bevf->inbuf_cb); + + if (bevf->outbuf_cb) + evbuffer_remove_cb_entry(bev->output, bevf->outbuf_cb); } static int @@ -418,9 +434,8 @@ bufferevent_filtered_outbuf_cb(struct evbuffer *buf, } } -/* Called when the underlying socket has read. */ static void -be_filter_readcb(struct bufferevent *underlying, void *me_) +be_filter_read_nolock_(struct bufferevent *underlying, void *me_) { struct bufferevent_filtered *bevf = me_; enum bufferevent_filter_result res; @@ -429,8 +444,6 @@ be_filter_readcb(struct bufferevent *underlying, void *me_) struct bufferevent_private *bufev_private = BEV_UPCAST(bufev); int processed_any = 0; - BEV_LOCK(bufev); - // It's possible our refcount is 0 at this point if another thread free'd our filterevent EVUTIL_ASSERT(bufev_private->refcnt >= 0); @@ -449,11 +462,65 @@ be_filter_readcb(struct bufferevent *underlying, void *me_) /* XXX This should be in process_input, not here. There are * other places that can call process-input, and they should * force readcb calls as needed. */ - if (processed_any) + if (processed_any) { bufferevent_trigger_nolock_(bufev, EV_READ, 0); + if (evbuffer_get_length(underlying->input) > 0 && + be_readbuf_full(bevf, state)) { + /* data left in underlying buffer and filter input buffer + * hit its read high watermark. + * Schedule callback to avoid data gets stuck in underlying + * input buffer. + */ + evbuffer_cb_set_flags(bufev->input, bevf->inbuf_cb, + EVBUFFER_CB_ENABLED); + } + } + } +} + +/* Called when the size of our inbuf changes. */ +static void +bufferevent_filtered_inbuf_cb(struct evbuffer *buf, + const struct evbuffer_cb_info *cbinfo, void *arg) +{ + struct bufferevent_filtered *bevf = arg; + enum bufferevent_flush_mode state; + struct bufferevent *bev = downcast(bevf); + + BEV_LOCK(bev); + + if (bevf->got_eof) + state = BEV_FINISHED; + else + state = BEV_NORMAL; + + + if (!be_readbuf_full(bevf, state)) { + /* opportunity to read data which was left in underlying + * input buffer because filter input buffer hit read + * high watermark. + */ + evbuffer_cb_clear_flags(bev->input, bevf->inbuf_cb, + EVBUFFER_CB_ENABLED); + if (evbuffer_get_length(bevf->underlying->input) > 0) + be_filter_read_nolock_(bevf->underlying, bevf); } - BEV_UNLOCK(bufev); + BEV_UNLOCK(bev); +} + +/* Called when the underlying socket has read. */ +static void +be_filter_readcb(struct bufferevent *underlying, void *me_) +{ + struct bufferevent_filtered *bevf = me_; + struct bufferevent *bev = downcast(bevf); + + BEV_LOCK(bev); + + be_filter_read_nolock_(underlying, me_); + + BEV_UNLOCK(bev); } /* Called when the underlying socket has drained enough that we can write to diff --git a/test/regress_bufferevent.c b/test/regress_bufferevent.c index 6f214a7d..27df02f1 100644 --- a/test/regress_bufferevent.c +++ b/test/regress_bufferevent.c @@ -1227,6 +1227,106 @@ end: bufferevent_free(bev2); } +struct bufferevent_filter_data_stuck { + size_t header_size; + size_t total_read; +}; + +static void +bufferevent_filter_data_stuck_readcb(struct bufferevent *bev, void *arg) +{ + struct bufferevent_filter_data_stuck *filter_data = arg; + struct evbuffer *input = bufferevent_get_input(bev); + size_t read_size = evbuffer_get_length(input); + evbuffer_drain(input, read_size); + filter_data->total_read += read_size; +} + +/** + * This filter prepends header once before forwarding data. + */ +static enum bufferevent_filter_result +bufferevent_filter_data_stuck_inputcb( + struct evbuffer *src, struct evbuffer *dst, ev_ssize_t dst_limit, + enum bufferevent_flush_mode mode, void *ctx) +{ + struct bufferevent_filter_data_stuck *filter_data = ctx; + static int header_inserted = 0; + size_t payload_size; + size_t header_size = 0; + + if (!header_inserted) { + char *header = calloc(filter_data->header_size, 1); + evbuffer_add(dst, header, filter_data->header_size); + free(header); + header_size = filter_data->header_size; + header_inserted = 1; + } + + payload_size = evbuffer_get_length(src); + if (payload_size > dst_limit - header_size) { + payload_size = dst_limit - header_size; + } + + tt_int_op(payload_size, ==, evbuffer_remove_buffer(src, dst, payload_size)); + +end: + return BEV_OK; +} + +static void +test_bufferevent_filter_data_stuck(void *arg) +{ + const size_t read_high_wm = 4096; + struct bufferevent_filter_data_stuck filter_data; + struct basic_test_data *data = arg; + struct bufferevent *pair[2]; + struct bufferevent *filter = NULL; + + int options = BEV_OPT_CLOSE_ON_FREE | BEV_OPT_DEFER_CALLBACKS; + + char payload[4096]; + int payload_size = sizeof(payload); + + memset(&filter_data, 0, sizeof(filter_data)); + filter_data.header_size = 20; + + tt_assert(bufferevent_pair_new(data->base, options, pair) == 0); + + bufferevent_setwatermark(pair[0], EV_READ, 0, read_high_wm); + bufferevent_setwatermark(pair[1], EV_READ, 0, read_high_wm); + + tt_assert( + filter = + bufferevent_filter_new(pair[1], + bufferevent_filter_data_stuck_inputcb, + NULL, + options, + NULL, + &filter_data)); + + bufferevent_setcb(filter, + bufferevent_filter_data_stuck_readcb, + NULL, + NULL, + &filter_data); + + tt_assert(bufferevent_enable(filter, EV_READ|EV_WRITE) == 0); + + bufferevent_setwatermark(filter, EV_READ, 0, read_high_wm); + + tt_assert(bufferevent_write(pair[0], payload, sizeof(payload)) == 0); + + event_base_dispatch(data->base); + + tt_int_op(filter_data.total_read, ==, payload_size + filter_data.header_size); +end: + if (pair[0]) + bufferevent_free(pair[0]); + if (filter) + bufferevent_free(filter); +} + struct testcase_t bufferevent_testcases[] = { LEGACY(bufferevent, TT_ISOLATED), @@ -1299,6 +1399,9 @@ struct testcase_t bufferevent_testcases[] = { { "bufferevent_pair_flush", test_bufferevent_pair_flush, TT_FORK|TT_NEED_BASE, &basic_setup, NULL }, + { "bufferevent_filter_data_stuck", + test_bufferevent_filter_data_stuck, + TT_FORK|TT_NEED_BASE, &basic_setup, NULL }, END_OF_TESTCASES, };