mirror of
https://github.com/libevent/libevent.git
synced 2025-01-09 00:56:20 +08:00
be_filter: avoid data stuck under active watermarks
Suppose we have bufferevent filter attached to bufferevent socket. Read high watermark for bufferevent filter is configured to 4096 bytes. Socket receives 4343 bytes. Due to watermark, 4096 bytes are transferred from socket input buffer to filter input buffer and 247 bytes are left in bufferevent socket. Suppose that no more data is received through socket. At this point 247 bytes will sit forever in input buffer of bufferevent socket. The patch attached solves this issue registering read callback to filter's input buffer if it reaches its read high water mark and data was left in corresponding underlying's input buffer. This read callback calls filter process input function as soon as filter input buffer falls below its read high watermark and there still is data left in underlying input buffer. Callback is deregistered as soon as filter input buffer falls below its read high watermark.
This commit is contained in:
parent
285188963d
commit
b627ad88bf
@ -71,6 +71,9 @@ static int be_filter_flush(struct bufferevent *bufev,
|
||||
short iotype, enum bufferevent_flush_mode mode);
|
||||
static int be_filter_ctrl(struct bufferevent *, enum bufferevent_ctrl_op, union bufferevent_ctrl_data *);
|
||||
|
||||
static void bufferevent_filtered_inbuf_cb(struct evbuffer *buf,
|
||||
const struct evbuffer_cb_info *cbinfo, void *arg);
|
||||
|
||||
static void bufferevent_filtered_outbuf_cb(struct evbuffer *buf,
|
||||
const struct evbuffer_cb_info *info, void *arg);
|
||||
|
||||
@ -79,6 +82,8 @@ struct bufferevent_filtered {
|
||||
|
||||
/** The bufferevent that we read/write filtered data from/to. */
|
||||
struct bufferevent *underlying;
|
||||
/** A callback on our inbuf to notice somebory removes data */
|
||||
struct evbuffer_cb_entry *inbuf_cb;
|
||||
/** A callback on our outbuf to notice when somebody adds data */
|
||||
struct evbuffer_cb_entry *outbuf_cb;
|
||||
/** True iff we have received an EOF callback from the underlying
|
||||
@ -203,6 +208,11 @@ bufferevent_filter_new(struct bufferevent *underlying,
|
||||
bufferevent_setcb(bufev_f->underlying,
|
||||
be_filter_readcb, be_filter_writecb, be_filter_eventcb, bufev_f);
|
||||
|
||||
bufev_f->inbuf_cb = evbuffer_add_cb(downcast(bufev_f)->input,
|
||||
bufferevent_filtered_inbuf_cb, bufev_f);
|
||||
evbuffer_cb_clear_flags(downcast(bufev_f)->input, bufev_f->inbuf_cb,
|
||||
EVBUFFER_CB_ENABLED);
|
||||
|
||||
bufev_f->outbuf_cb = evbuffer_add_cb(downcast(bufev_f)->output,
|
||||
bufferevent_filtered_outbuf_cb, bufev_f);
|
||||
|
||||
@ -251,6 +261,12 @@ be_filter_destruct(struct bufferevent *bev)
|
||||
EVUTIL_ASSERT(bevf);
|
||||
if (bevf->free_context)
|
||||
bevf->free_context(bevf->context);
|
||||
|
||||
if (bevf->inbuf_cb)
|
||||
evbuffer_remove_cb_entry(bev->input, bevf->inbuf_cb);
|
||||
|
||||
if (bevf->outbuf_cb)
|
||||
evbuffer_remove_cb_entry(bev->output, bevf->outbuf_cb);
|
||||
}
|
||||
|
||||
static int
|
||||
@ -418,9 +434,8 @@ bufferevent_filtered_outbuf_cb(struct evbuffer *buf,
|
||||
}
|
||||
}
|
||||
|
||||
/* Called when the underlying socket has read. */
|
||||
static void
|
||||
be_filter_readcb(struct bufferevent *underlying, void *me_)
|
||||
be_filter_read_nolock_(struct bufferevent *underlying, void *me_)
|
||||
{
|
||||
struct bufferevent_filtered *bevf = me_;
|
||||
enum bufferevent_filter_result res;
|
||||
@ -429,8 +444,6 @@ be_filter_readcb(struct bufferevent *underlying, void *me_)
|
||||
struct bufferevent_private *bufev_private = BEV_UPCAST(bufev);
|
||||
int processed_any = 0;
|
||||
|
||||
BEV_LOCK(bufev);
|
||||
|
||||
// It's possible our refcount is 0 at this point if another thread free'd our filterevent
|
||||
EVUTIL_ASSERT(bufev_private->refcnt >= 0);
|
||||
|
||||
@ -449,11 +462,65 @@ be_filter_readcb(struct bufferevent *underlying, void *me_)
|
||||
/* XXX This should be in process_input, not here. There are
|
||||
* other places that can call process-input, and they should
|
||||
* force readcb calls as needed. */
|
||||
if (processed_any)
|
||||
if (processed_any) {
|
||||
bufferevent_trigger_nolock_(bufev, EV_READ, 0);
|
||||
if (evbuffer_get_length(underlying->input) > 0 &&
|
||||
be_readbuf_full(bevf, state)) {
|
||||
/* data left in underlying buffer and filter input buffer
|
||||
* hit its read high watermark.
|
||||
* Schedule callback to avoid data gets stuck in underlying
|
||||
* input buffer.
|
||||
*/
|
||||
evbuffer_cb_set_flags(bufev->input, bevf->inbuf_cb,
|
||||
EVBUFFER_CB_ENABLED);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/* Called when the size of our inbuf changes. */
|
||||
static void
|
||||
bufferevent_filtered_inbuf_cb(struct evbuffer *buf,
|
||||
const struct evbuffer_cb_info *cbinfo, void *arg)
|
||||
{
|
||||
struct bufferevent_filtered *bevf = arg;
|
||||
enum bufferevent_flush_mode state;
|
||||
struct bufferevent *bev = downcast(bevf);
|
||||
|
||||
BEV_LOCK(bev);
|
||||
|
||||
if (bevf->got_eof)
|
||||
state = BEV_FINISHED;
|
||||
else
|
||||
state = BEV_NORMAL;
|
||||
|
||||
|
||||
if (!be_readbuf_full(bevf, state)) {
|
||||
/* opportunity to read data which was left in underlying
|
||||
* input buffer because filter input buffer hit read
|
||||
* high watermark.
|
||||
*/
|
||||
evbuffer_cb_clear_flags(bev->input, bevf->inbuf_cb,
|
||||
EVBUFFER_CB_ENABLED);
|
||||
if (evbuffer_get_length(bevf->underlying->input) > 0)
|
||||
be_filter_read_nolock_(bevf->underlying, bevf);
|
||||
}
|
||||
|
||||
BEV_UNLOCK(bufev);
|
||||
BEV_UNLOCK(bev);
|
||||
}
|
||||
|
||||
/* Called when the underlying socket has read. */
|
||||
static void
|
||||
be_filter_readcb(struct bufferevent *underlying, void *me_)
|
||||
{
|
||||
struct bufferevent_filtered *bevf = me_;
|
||||
struct bufferevent *bev = downcast(bevf);
|
||||
|
||||
BEV_LOCK(bev);
|
||||
|
||||
be_filter_read_nolock_(underlying, me_);
|
||||
|
||||
BEV_UNLOCK(bev);
|
||||
}
|
||||
|
||||
/* Called when the underlying socket has drained enough that we can write to
|
||||
|
Loading…
x
Reference in New Issue
Block a user