mirror of
https://github.com/libevent/libevent.git
synced 2025-01-31 09:12:55 +08:00
Always hold a reference to a bufferevent when calling its callbacks.
Rationale: we hold a lock on the bufferevent when its callbacks are executing, so we need to release the lock afterwards. But the callback might free the bufferevent, so unless we're holding a reference on the bufferevent, the lock might not be there for us to release. svn:r1347
This commit is contained in:
parent
e83a32dfe1
commit
a62283a9c3
@ -156,6 +156,9 @@ void bufferevent_wm_unsuspend_read(struct bufferevent *bufev);
|
||||
int bufferevent_enable_locking(struct bufferevent *bufev, void *lock);
|
||||
/** Internal: Increment the reference count on bufev. */
|
||||
void bufferevent_incref(struct bufferevent *bufev);
|
||||
/** Internal: Lock bufev and increase its reference count.
|
||||
* unlocking it otherwise. */
|
||||
void _bufferevent_incref_and_lock(struct bufferevent *bufev);
|
||||
/** Internal: Drop the reference count on bufev, freeing as necessary, and
|
||||
* unlocking it otherwise. */
|
||||
void _bufferevent_decref_and_unlock(struct bufferevent *bufev);
|
||||
|
@ -143,7 +143,7 @@ bufferevent_run_deferred_callbacks(struct deferred_cb *_, void *arg)
|
||||
void
|
||||
_bufferevent_run_readcb(struct bufferevent *bufev)
|
||||
{
|
||||
/* Requires lock. */
|
||||
/* Requires that we hold the lock and a reference */
|
||||
struct bufferevent_private *p =
|
||||
EVUTIL_UPCAST(bufev, struct bufferevent_private, bev);
|
||||
if (p->options & BEV_OPT_DEFER_CALLBACKS) {
|
||||
@ -161,7 +161,7 @@ _bufferevent_run_readcb(struct bufferevent *bufev)
|
||||
void
|
||||
_bufferevent_run_writecb(struct bufferevent *bufev)
|
||||
{
|
||||
/* Requires lock. */
|
||||
/* Requires that we hold the lock and a reference */
|
||||
struct bufferevent_private *p =
|
||||
EVUTIL_UPCAST(bufev, struct bufferevent_private, bev);
|
||||
if (p->options & BEV_OPT_DEFER_CALLBACKS) {
|
||||
@ -179,7 +179,7 @@ _bufferevent_run_writecb(struct bufferevent *bufev)
|
||||
void
|
||||
_bufferevent_run_eventcb(struct bufferevent *bufev, short what)
|
||||
{
|
||||
/* Requires lock. */
|
||||
/* Requires that we hold the lock and a reference */
|
||||
struct bufferevent_private *p =
|
||||
EVUTIL_UPCAST(bufev, struct bufferevent_private, bev);
|
||||
if (p->options & BEV_OPT_DEFER_CALLBACKS) {
|
||||
@ -460,6 +460,15 @@ bufferevent_flush(struct bufferevent *bufev,
|
||||
return r;
|
||||
}
|
||||
|
||||
void
|
||||
_bufferevent_incref_and_lock(struct bufferevent *bufev)
|
||||
{
|
||||
struct bufferevent_private *bufev_private =
|
||||
BEV_UPCAST(bufev);
|
||||
BEV_LOCK(bufev);
|
||||
++bufev_private->refcnt;
|
||||
}
|
||||
|
||||
void
|
||||
_bufferevent_decref_and_unlock(struct bufferevent *bufev)
|
||||
{
|
||||
@ -574,17 +583,17 @@ static void
|
||||
bufferevent_generic_read_timeout_cb(evutil_socket_t fd, short event, void *ctx)
|
||||
{
|
||||
struct bufferevent *bev = ctx;
|
||||
BEV_LOCK(bev);
|
||||
_bufferevent_incref_and_lock(bev);
|
||||
_bufferevent_run_eventcb(bev, BEV_EVENT_TIMEOUT|BEV_EVENT_READING);
|
||||
BEV_UNLOCK(bev);
|
||||
_bufferevent_decref_and_unlock(bev);
|
||||
}
|
||||
static void
|
||||
bufferevent_generic_write_timeout_cb(evutil_socket_t fd, short event, void *ctx)
|
||||
{
|
||||
struct bufferevent *bev = ctx;
|
||||
BEV_LOCK(bev);
|
||||
_bufferevent_incref_and_lock(bev);
|
||||
_bufferevent_run_eventcb(bev, BEV_EVENT_TIMEOUT|BEV_EVENT_WRITING);
|
||||
BEV_UNLOCK(bev);
|
||||
_bufferevent_decref_and_unlock(bev);
|
||||
}
|
||||
|
||||
void
|
||||
|
@ -152,7 +152,7 @@ be_async_outbuf_callback(struct evbuffer *buf,
|
||||
/* If we successfully wrote from the outbuf, or we added data to the
|
||||
* outbuf and were not writing before, we may want to write now. */
|
||||
|
||||
BEV_LOCK(bev);
|
||||
_bufferevent_incref_and_lock(bev);
|
||||
if (cbinfo->n_deleted) {
|
||||
/* XXXX can't detect 0-length write completion */
|
||||
bev_async->write_in_progress = 0;
|
||||
@ -169,7 +169,7 @@ be_async_outbuf_callback(struct evbuffer *buf,
|
||||
_bufferevent_run_writecb(bev);
|
||||
}
|
||||
|
||||
BEV_UNLOCK(bev);
|
||||
_bufferevent_decref_and_unlock(bev);
|
||||
}
|
||||
|
||||
static void
|
||||
@ -183,7 +183,7 @@ be_async_inbuf_callback(struct evbuffer *buf,
|
||||
/* If we successfully read into the inbuf, or we drained data from
|
||||
* the inbuf and were not reading before, we may want to read now */
|
||||
|
||||
BEV_LOCK(bev);
|
||||
_bufferevent_incref_and_lock(bev);
|
||||
if (cbinfo->n_added) {
|
||||
/* XXXX can't detect 0-length read completion */
|
||||
bev_async->read_in_progress = 0;
|
||||
@ -200,7 +200,7 @@ be_async_inbuf_callback(struct evbuffer *buf,
|
||||
_bufferevent_run_readcb(bev);
|
||||
}
|
||||
|
||||
BEV_UNLOCK(bev);
|
||||
_bufferevent_decref_and_unlock(bev);
|
||||
}
|
||||
|
||||
static int
|
||||
|
@ -286,6 +286,7 @@ be_filter_process_output(struct bufferevent_filtered *bevf,
|
||||
enum bufferevent_flush_mode state,
|
||||
int *processed_out)
|
||||
{
|
||||
/* Requires references and lock: might call writecb */
|
||||
enum bufferevent_filter_result res = BEV_OK;
|
||||
struct bufferevent *bufev = downcast(bevf);
|
||||
int again = 0;
|
||||
@ -365,12 +366,15 @@ bufferevent_filtered_outbuf_cb(struct evbuffer *buf,
|
||||
const struct evbuffer_cb_info *cbinfo, void *arg)
|
||||
{
|
||||
struct bufferevent_filtered *bevf = arg;
|
||||
struct bufferevent *bev = downcast(bevf);
|
||||
|
||||
if (cbinfo->n_added) {
|
||||
int processed_any = 0;
|
||||
/* Somebody added more data to the output buffer. Try to
|
||||
* process it, if we should. */
|
||||
_bufferevent_incref_and_lock(bev);
|
||||
be_filter_process_output(bevf, BEV_NORMAL, &processed_any);
|
||||
_bufferevent_decref_and_unlock(bev);
|
||||
}
|
||||
}
|
||||
|
||||
@ -384,6 +388,8 @@ be_filter_readcb(struct bufferevent *underlying, void *_me)
|
||||
struct bufferevent *bufev = downcast(bevf);
|
||||
int processed_any = 0;
|
||||
|
||||
_bufferevent_incref_and_lock(bufev);
|
||||
|
||||
if (bevf->got_eof)
|
||||
state = BEV_FINISHED;
|
||||
else
|
||||
@ -391,10 +397,15 @@ be_filter_readcb(struct bufferevent *underlying, void *_me)
|
||||
|
||||
res = be_filter_process_input(bevf, state, &processed_any);
|
||||
|
||||
/* XXX This should be in process_input, not here. There are
|
||||
* other places that can call process-input, and they should
|
||||
* force readcb calls as needed. */
|
||||
if (processed_any &&
|
||||
evbuffer_get_length(bufev->input) >= bufev->wm_read.low &&
|
||||
bufev->readcb != NULL)
|
||||
_bufferevent_run_readcb(bufev);
|
||||
|
||||
_bufferevent_decref_and_unlock(bufev);
|
||||
}
|
||||
|
||||
/* Called when the underlying socket has drained enough that we can write to
|
||||
@ -403,9 +414,12 @@ static void
|
||||
be_filter_writecb(struct bufferevent *underlying, void *_me)
|
||||
{
|
||||
struct bufferevent_filtered *bevf = _me;
|
||||
struct bufferevent *bev = downcast(bevf);
|
||||
int processed_any = 0;
|
||||
|
||||
_bufferevent_incref_and_lock(bev);
|
||||
be_filter_process_output(bevf, BEV_NORMAL, &processed_any);
|
||||
_bufferevent_decref_and_unlock(bev);
|
||||
}
|
||||
|
||||
/* Called when the underlying socket has given us an error */
|
||||
@ -415,9 +429,11 @@ be_filter_eventcb(struct bufferevent *underlying, short what, void *_me)
|
||||
struct bufferevent_filtered *bevf = _me;
|
||||
struct bufferevent *bev = downcast(bevf);
|
||||
|
||||
_bufferevent_incref_and_lock(bev);
|
||||
/* All we can really to is tell our own eventcb. */
|
||||
if (bev->errorcb)
|
||||
_bufferevent_run_eventcb(bev, what);
|
||||
_bufferevent_decref_and_unlock(bev);
|
||||
}
|
||||
|
||||
static int
|
||||
@ -428,6 +444,8 @@ be_filter_flush(struct bufferevent *bufev,
|
||||
int processed_any = 0;
|
||||
assert(bevf);
|
||||
|
||||
_bufferevent_incref_and_lock(bufev);
|
||||
|
||||
if (iotype & EV_READ) {
|
||||
be_filter_process_input(bevf, mode, &processed_any);
|
||||
}
|
||||
@ -438,6 +456,8 @@ be_filter_flush(struct bufferevent *bufev,
|
||||
/* XXX does this want to recursively call lower-level flushes? */
|
||||
bufferevent_flush(bevf->underlying, iotype, mode);
|
||||
|
||||
_bufferevent_decref_and_unlock(bufev);
|
||||
|
||||
return processed_any;
|
||||
}
|
||||
|
||||
|
@ -66,6 +66,25 @@ upcast(struct bufferevent *bev)
|
||||
|
||||
#define downcast(bev_pair) (&(bev_pair)->bev.bev)
|
||||
|
||||
static inline void
|
||||
incref_and_lock(struct bufferevent *b)
|
||||
{
|
||||
struct bufferevent_pair *bevp;
|
||||
_bufferevent_incref_and_lock(b);
|
||||
bevp = upcast(b);
|
||||
if (bevp->partner)
|
||||
_bufferevent_incref_and_lock(downcast(bevp->partner));
|
||||
}
|
||||
|
||||
static inline void
|
||||
decref_and_unlock(struct bufferevent *b)
|
||||
{
|
||||
struct bufferevent_pair *bevp = upcast(b);
|
||||
if (bevp->partner)
|
||||
_bufferevent_decref_and_unlock(downcast(bevp->partner));
|
||||
_bufferevent_decref_and_unlock(b);
|
||||
}
|
||||
|
||||
/* XXX Handle close */
|
||||
|
||||
static void be_pair_outbuf_cb(struct evbuffer *,
|
||||
@ -189,6 +208,8 @@ be_pair_outbuf_cb(struct evbuffer *outbuf,
|
||||
struct bufferevent_pair *bev_pair = arg;
|
||||
struct bufferevent_pair *partner = bev_pair->partner;
|
||||
|
||||
incref_and_lock(downcast(bev_pair));
|
||||
|
||||
if (info->n_added > info->n_deleted && partner) {
|
||||
/* We got more data. If the other side's reading, then
|
||||
hand it over. */
|
||||
@ -196,6 +217,8 @@ be_pair_outbuf_cb(struct evbuffer *outbuf,
|
||||
be_pair_transfer(downcast(bev_pair), downcast(partner), 0);
|
||||
}
|
||||
}
|
||||
|
||||
decref_and_unlock(downcast(bev_pair));
|
||||
}
|
||||
|
||||
static int
|
||||
@ -204,6 +227,8 @@ be_pair_enable(struct bufferevent *bufev, short events)
|
||||
struct bufferevent_pair *bev_p = upcast(bufev);
|
||||
struct bufferevent_pair *partner = bev_p->partner;
|
||||
|
||||
incref_and_lock(bufev);
|
||||
|
||||
_bufferevent_generic_adj_timeouts(bufev);
|
||||
|
||||
/* We're starting to read! Does the other side have anything to write?*/
|
||||
@ -216,6 +241,7 @@ be_pair_enable(struct bufferevent *bufev, short events)
|
||||
be_pair_wants_to_talk(bev_p, partner)) {
|
||||
be_pair_transfer(bufev, downcast(partner), 0);
|
||||
}
|
||||
decref_and_unlock(bufev);
|
||||
return 0;
|
||||
}
|
||||
|
||||
@ -245,6 +271,7 @@ be_pair_flush(struct bufferevent *bev, short iotype,
|
||||
{
|
||||
struct bufferevent_pair *bev_p = upcast(bev);
|
||||
struct bufferevent *partner;
|
||||
incref_and_lock(bev);
|
||||
if (!bev_p->partner)
|
||||
return -1;
|
||||
|
||||
@ -263,6 +290,7 @@ be_pair_flush(struct bufferevent *bev, short iotype,
|
||||
if (partner->errorcb)
|
||||
_bufferevent_run_eventcb(partner, iotype|BEV_EVENT_EOF);
|
||||
}
|
||||
decref_and_unlock(bev);
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
@ -122,7 +122,7 @@ bufferevent_readcb(evutil_socket_t fd, short event, void *arg)
|
||||
short what = BEV_EVENT_READING;
|
||||
int howmuch = -1;
|
||||
|
||||
BEV_LOCK(arg);
|
||||
_bufferevent_incref_and_lock(bufev);
|
||||
|
||||
if (event == EV_TIMEOUT) {
|
||||
what |= BEV_EVENT_TIMEOUT;
|
||||
@ -178,7 +178,7 @@ bufferevent_readcb(evutil_socket_t fd, short event, void *arg)
|
||||
_bufferevent_run_eventcb(bufev, what);
|
||||
|
||||
done:
|
||||
BEV_UNLOCK(bufev);
|
||||
_bufferevent_decref_and_unlock(bufev);
|
||||
}
|
||||
|
||||
static void
|
||||
@ -190,7 +190,7 @@ bufferevent_writecb(evutil_socket_t fd, short event, void *arg)
|
||||
int res = 0;
|
||||
short what = BEV_EVENT_WRITING;
|
||||
|
||||
BEV_LOCK(bufev);
|
||||
_bufferevent_incref_and_lock(bufev);
|
||||
|
||||
if (event == EV_TIMEOUT) {
|
||||
what |= BEV_EVENT_TIMEOUT;
|
||||
@ -245,7 +245,7 @@ bufferevent_writecb(evutil_socket_t fd, short event, void *arg)
|
||||
_bufferevent_run_eventcb(bufev, what);
|
||||
|
||||
done:
|
||||
BEV_UNLOCK(bufev);
|
||||
_bufferevent_decref_and_unlock(bufev);
|
||||
}
|
||||
|
||||
struct bufferevent *
|
||||
@ -288,18 +288,21 @@ bufferevent_socket_connect(struct bufferevent *bev,
|
||||
int family = sa->sa_family;
|
||||
evutil_socket_t fd;
|
||||
int made_socket = 0;
|
||||
int result = -1;
|
||||
|
||||
_bufferevent_incref_and_lock(bev);
|
||||
|
||||
if (!bufev_p)
|
||||
return -1;
|
||||
goto done;
|
||||
|
||||
fd = event_get_fd(&bev->ev_read);
|
||||
if (fd < 0) {
|
||||
made_socket = 1;
|
||||
if ((fd = socket(family, SOCK_STREAM, 0)) < 0)
|
||||
return -1;
|
||||
goto done;
|
||||
if (evutil_make_socket_nonblocking(fd) < 0) {
|
||||
EVUTIL_CLOSESOCKET(fd);
|
||||
return -1;
|
||||
goto done;
|
||||
}
|
||||
be_socket_setfd(bev, fd);
|
||||
}
|
||||
@ -309,7 +312,8 @@ bufferevent_socket_connect(struct bufferevent *bev,
|
||||
if (EVUTIL_ERR_CONNECT_RETRIABLE(e)) {
|
||||
if (! be_socket_enable(bev, EV_WRITE)) {
|
||||
bufev_p->connecting = 1;
|
||||
return 0;
|
||||
result = 0;
|
||||
goto done;
|
||||
}
|
||||
}
|
||||
_bufferevent_run_eventcb(bev, BEV_EVENT_ERROR);
|
||||
@ -319,7 +323,10 @@ bufferevent_socket_connect(struct bufferevent *bev,
|
||||
_bufferevent_run_eventcb(bev, BEV_EVENT_CONNECTED);
|
||||
}
|
||||
|
||||
return 0;
|
||||
result = 0;
|
||||
done:
|
||||
_bufferevent_decref_and_unlock(bev);
|
||||
return result;
|
||||
}
|
||||
|
||||
/*
|
||||
|
Loading…
x
Reference in New Issue
Block a user