Add locking to event_base_loop.

This is harder than it sounds, since we need to make sure to
release the lock around the key call to the kernel (e.g.,
select, epoll_wait, kevent), AND we need to make sure that
none of the fields that are used in that call are touched by
anything that might be running concurrently in another
thread.  I managed to do this pretty well for everything but
poll().  With poll, I needed to introduce a copy of the
event_set structure.

This patch also fixes a bug in win32.c where we called
realloc() instead of mm_realloc().

svn:r1450
This commit is contained in:
Nick Mathewson 2009-10-21 03:54:00 +00:00
parent 5082546682
commit 6b22e74aa1
9 changed files with 181 additions and 61 deletions

View File

@ -28,6 +28,8 @@ Changes in 2.0.3-alpha:
o Fix some bugs when using the old evdns interfaces to initialize the evdns module.
o Detect errors during bufferevent_connect(). Patch from Christopher Davis.
o Fix compilation for listener.h for C++ - missing extern "C". Patch from Ferenc Szalai.
o Make the event_base_loop() family of functions respect thread-safety better. This should clear up a few hard-to-debug race conditions.
o Fix a bug when using a specialized memory allocator on win32.
Changes in 2.0.2-alpha:

View File

@ -65,6 +65,7 @@ struct idx_info {
struct win32op {
int fd_setsz;
int resize_out_sets;
struct win_fd_set *readset_in;
struct win_fd_set *writeset_in;
struct win_fd_set *readset_out;
@ -103,16 +104,11 @@ realloc_fd_sets(struct win32op *op, size_t new_size)
assert(new_size >= 1);
size = FD_SET_ALLOC_SIZE(new_size);
if (!(op->readset_in = realloc(op->readset_in, size)))
if (!(op->readset_in = mm_realloc(op->readset_in, size)))
return (-1);
if (!(op->writeset_in = realloc(op->writeset_in, size)))
return (-1);
if (!(op->readset_out = realloc(op->readset_out, size)))
return (-1);
if (!(op->exset_out = realloc(op->exset_out, size)))
return (-1);
if (!(op->writeset_out = realloc(op->writeset_out, size)))
if (!(op->writeset_in = mm_realloc(op->writeset_in, size)))
return (-1);
op->resize_out_sets = 1;
op->fd_setsz = new_size;
return (0);
}
@ -286,6 +282,16 @@ win32_dispatch(struct event_base *base, struct timeval *tv)
int fd_count;
SOCKET s;
if (op->resize_out_sets) {
if (!(op->readset_out = mm_realloc(op->readset_out, size)))
return (-1);
if (!(op->exset_out = mm_realloc(op->exset_out, size)))
return (-1);
if (!(op->writeset_out = mm_realloc(op->writeset_out, size)))
return (-1);
op->resize_out_sets = 0;
}
fd_set_copy(win32op->readset_out, win32op->readset_in);
fd_set_copy(win32op->exset_out, win32op->readset_in);
fd_set_copy(win32op->writeset_out, win32op->writeset_in);
@ -301,11 +307,15 @@ win32_dispatch(struct event_base *base, struct timeval *tv)
return (0);
}
EVBASE_RELEASE_LOCK(base, EVTHREAD_WRITE, th_base_lock);
res = select(fd_count,
(struct fd_set*)win32op->readset_out,
(struct fd_set*)win32op->writeset_out,
(struct fd_set*)win32op->exset_out, tv);
EVBASE_ACQUIRE_LOCK(base, EVTHREAD_WRITE, th_base_lock);
event_debug(("%s: select returned %d", __func__, res));
if(res <= 0) {

View File

@ -140,6 +140,8 @@ devpoll_init(struct event_base *base)
devpollop->dpfd = dpfd;
/* Initialize fields */
/* FIXME: allocating 'nfiles' worth of space here can be
* expensive and unnecessary. See how epoll.c does it instead. */
devpollop->events = mm_calloc(nfiles, sizeof(struct pollfd));
if (devpollop->events == NULL) {
mm_free(devpollop);
@ -179,8 +181,12 @@ devpoll_dispatch(struct event_base *base, struct timeval *tv)
dvp.dp_nfds = devpollop->nevents;
dvp.dp_timeout = timeout;
EVBASE_RELEASE_LOCK(base, EVTHREAD_WRITE, th_base_lock);
res = ioctl(devpollop->dpfd, DP_POLL, &dvp);
EVBASE_ACQUIRE_LOCK(base, EVTHREAD_WRITE, th_base_lock);
if (res == -1) {
if (errno != EINTR) {
event_warn("ioctl: DP_POLL");

View File

@ -51,6 +51,8 @@
#include "event-internal.h"
#include "evsignal-internal.h"
#include "event2/thread.h"
#include "evthread-internal.h"
#include "log-internal.h"
#include "evmap-internal.h"
@ -148,8 +150,12 @@ epoll_dispatch(struct event_base *base, struct timeval *tv)
timeout = MAX_EPOLL_TIMEOUT_MSEC;
}
EVBASE_RELEASE_LOCK(base, EVTHREAD_WRITE, th_base_lock);
res = epoll_wait(epollop->epfd, events, epollop->nevents, timeout);
EVBASE_ACQUIRE_LOCK(base, EVTHREAD_WRITE, th_base_lock);
if (res == -1) {
if (errno != EINTR) {
event_warn("epoll_wait");

25
event.c
View File

@ -615,6 +615,7 @@ event_base_priority_init(struct event_base *base, int npriorities)
static int
event_haveevents(struct event_base *base)
{
/* Caller must hold th_base_lock */
return (base->event_count > 0);
}
@ -737,17 +738,16 @@ event_process_deferred_callbacks(struct deferred_cb_queue *queue, int *breakptr)
static void
event_process_active(struct event_base *base)
{
/* Caller must hold th_base_lock */
struct event_list *activeq = NULL;
int i, c;
EVBASE_ACQUIRE_LOCK(base, EVTHREAD_WRITE, th_base_lock);
for (i = 0; i < base->nactivequeues; ++i) {
if (TAILQ_FIRST(base->activequeues[i]) != NULL) {
activeq = base->activequeues[i];
c = event_process_active_single_queue(base, activeq);
if (c < 0)
goto unlock;
return;
else if (c > 0)
break; /* Processed a real event; do not
* consider lower-priority events */
@ -757,9 +757,6 @@ event_process_active(struct event_base *base)
}
event_process_deferred_callbacks(&base->defer_queue,&base->event_break);
unlock:
EVBASE_RELEASE_LOCK(base, EVTHREAD_WRITE, th_base_lock);
}
/*
@ -866,6 +863,10 @@ event_base_loop(struct event_base *base, int flags)
struct timeval *tv_p;
int res, done;
/* Grab the lock. We will release it inside evsel.dispatch, and again
* as we invoke user callbacks. */
EVBASE_ACQUIRE_LOCK(base, EVTHREAD_WRITE, th_base_lock);
/* clear time cache */
base->tv_cache.tv_sec = 0;
@ -933,6 +934,8 @@ event_base_loop(struct event_base *base, int flags)
/* clear time cache */
base->tv_cache.tv_sec = 0;
EVBASE_RELEASE_LOCK(base, EVTHREAD_WRITE, th_base_lock);
event_debug(("%s: asked to terminate loop.", __func__));
return (0);
}
@ -1496,12 +1499,12 @@ event_deferred_cb_schedule(struct deferred_cb_queue *queue,
static int
timeout_next(struct event_base *base, struct timeval **tv_p)
{
/* Caller must hold th_base_lock */
struct timeval now;
struct event *ev;
struct timeval *tv = *tv_p;
int res = 0;
EVBASE_ACQUIRE_LOCK(base, EVTHREAD_WRITE, th_base_lock);
ev = min_heap_top(&base->timeheap);
if (ev == NULL) {
@ -1527,7 +1530,6 @@ timeout_next(struct event_base *base, struct timeval **tv_p)
event_debug(("timeout_next: in %d seconds", (int)tv->tv_sec));
out:
EVBASE_RELEASE_LOCK(base, EVTHREAD_WRITE, th_base_lock);
return (res);
}
@ -1540,6 +1542,7 @@ out:
static void
timeout_correct(struct event_base *base, struct timeval *tv)
{
/* Caller must hold th_base_lock. */
struct event **pev;
unsigned int size;
struct timeval off;
@ -1549,11 +1552,9 @@ timeout_correct(struct event_base *base, struct timeval *tv)
/* Check if time is running backwards */
gettime(base, tv);
EVBASE_ACQUIRE_LOCK(base, EVTHREAD_WRITE, th_base_lock);
if (evutil_timercmp(tv, &base->event_tv, >=)) {
base->event_tv = *tv;
EVBASE_RELEASE_LOCK(base, EVTHREAD_WRITE, th_base_lock);
return;
}
@ -1573,16 +1574,15 @@ timeout_correct(struct event_base *base, struct timeval *tv)
}
/* Now remember what the new time turned out to be. */
base->event_tv = *tv;
EVBASE_RELEASE_LOCK(base, EVTHREAD_WRITE, th_base_lock);
}
static void
timeout_process(struct event_base *base)
{
/* Caller must hold lock. */
struct timeval now;
struct event *ev;
EVBASE_ACQUIRE_LOCK(base, EVTHREAD_WRITE, th_base_lock);
if (min_heap_empty(&base->timeheap)) {
EVBASE_RELEASE_LOCK(base, EVTHREAD_WRITE, th_base_lock);
return;
@ -1601,7 +1601,6 @@ timeout_process(struct event_base *base)
ev->ev_callback));
event_active_internal(ev, EV_TIMEOUT, 1);
}
EVBASE_RELEASE_LOCK(base, EVTHREAD_WRITE, th_base_lock);
}
static void

View File

@ -303,8 +303,14 @@ evport_dispatch(struct event_base *base, struct timeval *tv)
}
}
if ((res = port_getn(epdp->ed_port, pevtlist, EVENTS_PER_GETN,
(unsigned int *) &nevents, ts_p)) == -1) {
EVBASE_RELEASE_LOCK(base, EVTHREAD_WRITE, th_base_lock);
res = port_getn(epdp->ed_port, pevtlist, EVENTS_PER_GETN,
(unsigned int *) &nevents, ts_p);
EVBASE_ACQUIRE_LOCK(base, EVTHREAD_WRITE, th_base_lock);
if (res == -1) {
if (errno == EINTR || errno == EAGAIN) {
evsig_process(base);
return (0);

View File

@ -67,8 +67,13 @@
struct kqop {
struct kevent *changes;
int nchanges;
int changes_size;
struct kevent *pend_changes;
int n_pend_changes;
int pend_changes_size;
struct kevent *events;
int nevents;
int events_size;
int kq;
pid_t pid;
};
@ -133,13 +138,21 @@ kq_init(struct event_base *base)
mm_free (kqueueop);
return (NULL);
}
kqueueop->events = mm_malloc(NEVENT * sizeof(struct kevent));
if (kqueueop->events == NULL) {
kqueueop->pend_changes = mm_malloc(NEVENT * sizeof(struct kevent));
if (kqueueop->pendchanges == NULL) {
mm_free (kqueueop->changes);
mm_free (kqueueop);
return (NULL);
}
kqueueop->nevents = NEVENT;
kqueueop->events = mm_malloc(NEVENT * sizeof(struct kevent));
if (kqueueop->events == NULL) {
mm_free (kqueueop->changes);
mm_free (kqueueop->pend_changes);
mm_free (kqueueop);
return (NULL);
}
kqueueop->events_size = kqueueop->changes_size =
kqueueop->pend_changes_size = NEVENT;
/* Check for Mac OS X kqueue bug. */
kqueueop->changes[0].ident = -1;
@ -171,36 +184,21 @@ kq_init(struct event_base *base)
static int
kq_insert(struct kqop *kqop, struct kevent *kev)
{
int nevents = kqop->nevents;
int size = kqop->changes_size;
if (kqop->nchanges == nevents) {
if (kqop->nchanges == size) {
struct kevent *newchange;
struct kevent *newresult;
nevents *= 2;
size *= 2;
newchange = mm_realloc(kqop->changes,
nevents * sizeof(struct kevent));
size * sizeof(struct kevent));
if (newchange == NULL) {
event_warn("%s: malloc", __func__);
return (-1);
}
kqop->changes = newchange;
newresult = mm_realloc(kqop->events,
nevents * sizeof(struct kevent));
/*
* If we fail, we don't have to worry about freeing,
* the next realloc will pick it up.
*/
if (newresult == NULL) {
event_warn("%s: malloc", __func__);
return (-1);
}
kqop->events = newresult;
kqop->nevents = nevents;
kqop->changes_size = size;
}
memcpy(&kqop->changes[kqop->nchanges++], kev, sizeof(struct kevent));
@ -219,11 +217,17 @@ kq_sighandler(int sig)
/* Do nothing here */
}
#define SWAP(tp,a,b) \
do { \
tp tmp_swap_var = (a); \
a = b; \
b = tmp_swap_var; \
} while (0);
static int
kq_dispatch(struct event_base *base, struct timeval *tv)
{
struct kqop *kqop = base->evbase;
struct kevent *changes = kqop->changes;
struct kevent *events = kqop->events;
struct timespec ts, *ts_p = NULL;
int i, res;
@ -233,9 +237,23 @@ kq_dispatch(struct event_base *base, struct timeval *tv)
ts_p = &ts;
}
res = kevent(kqop->kq, changes, kqop->nchanges,
events, kqop->nevents, ts_p);
kqop->nchanges = 0;
/* We can't hold the lock while we're calling kqueue, so another
* thread might potentially mess with changes before the kernel has a
* chance to read it. Therefore, we need to keep the change list
* we're looking at in pend_changes, and let other threads mess with
* changes. */
SWAP(struct kevent *, kqop->changes, kqop->pend_changes);
SWAP(int, kqop->nchanges, kqop->npend_changes);
SWAP(int, kqop->changes_size, kqop->pend_changes_size);
EVBASE_RELEASE_LOCK(base, EVTHREAD_WRITE, th_base_lock);
res = kevent(kqop->kq, kqop->pend_changes, kqop->npend_changes,
events, kqop->events_size, ts_p);
EVBASE_ACQUIRE_LOCK(base, EVTHREAD_WRITE, th_base_lock);
kqop->npend_changes = 0;
if (res == -1) {
if (errno != EINTR) {
event_warn("kevent");
@ -289,6 +307,20 @@ kq_dispatch(struct event_base *base, struct timeval *tv)
}
}
if (res == kqop->nevents) {
struct kevent *newresult;
int size = kqop->events_size;
/* We used all the events space that we have. Maybe we should
make it bigger. */
size *= 2;
newresult = mm_realloc(kqop->events,
size * sizeof(struct kevent));
if (newresult) {
kqop->events = newresult;
kqop->events_size = size;
}
}
return (0);
}

47
poll.c
View File

@ -50,6 +50,8 @@
#include "evsignal-internal.h"
#include "log-internal.h"
#include "evmap-internal.h"
#include "event2/thread.h"
#include "evthread-internal.h"
struct pollidx {
int idxplus1;
@ -57,8 +59,11 @@ struct pollidx {
struct pollop {
int event_count; /* Highest number alloc */
int nfds; /* Size of event_* */
int nfds; /* Highest number used */
int realloc_copy; /* True iff we must realloc
* event_set_copy */
struct pollfd *event_set;
struct pollfd *event_set_copy;
};
static void *poll_init (struct event_base *);
@ -119,14 +124,43 @@ poll_dispatch(struct event_base *base, struct timeval *tv)
{
int res, i, j, msec = -1, nfds;
struct pollop *pop = base->evbase;
struct pollfd *event_set;
poll_check_ok(pop);
nfds = pop->nfds;
if (base->th_base_lock) {
/* If we're using this backend in a multithreaded setting,
* then we need to work on a copy of event_set, so that we can
* let other threads modify the main event_set while we're
* polling. If we're not multithreaded, then we'll skip the
* copy step here to save memory and time. */
if (pop->realloc_copy) {
struct pollfd *tmp = mm_realloc(pop->event_set_copy,
pop->event_count * sizeof(struct pollfd));
if (tmp == NULL) {
event_warn("realloc");
return -1;
}
pop->event_set_copy = tmp;
pop->realloc_copy = 0;
}
memcpy(pop->event_set_copy, pop->event_set,
sizeof(struct pollfd)*nfds);
event_set = pop->event_set_copy;
} else {
event_set = pop->event_set;
}
if (tv != NULL)
msec = tv->tv_sec * 1000 + (tv->tv_usec + 999) / 1000;
nfds = pop->nfds;
res = poll(pop->event_set, nfds, msec);
EVBASE_RELEASE_LOCK(base, EVTHREAD_WRITE, th_base_lock);
res = poll(event_set, nfds, msec);
EVBASE_ACQUIRE_LOCK(base, EVTHREAD_WRITE, th_base_lock);
if (res == -1) {
if (errno != EINTR) {
@ -150,7 +184,7 @@ poll_dispatch(struct event_base *base, struct timeval *tv)
int what;
if (++i == nfds)
i = 0;
what = pop->event_set[i].revents;
what = event_set[i].revents;
if (!what)
continue;
@ -166,7 +200,7 @@ poll_dispatch(struct event_base *base, struct timeval *tv)
if (res == 0)
continue;
evmap_io_active(base, pop->event_set[i].fd, res);
evmap_io_active(base, event_set[i].fd, res);
}
return (0);
@ -204,6 +238,7 @@ poll_add(struct event_base *base, int fd, short old, short events, void *_idx)
pop->event_set = tmp_event_set;
pop->event_count = tmp_event_count;
pop->realloc_copy = 1;
}
i = idx->idxplus1 - 1;
@ -289,6 +324,8 @@ poll_dealloc(struct event_base *base)
evsig_dealloc(base);
if (pop->event_set)
mm_free(pop->event_set);
if (pop->event_set_copy)
mm_free(pop->event_set_copy);
memset(pop, 0, sizeof(struct pollop));
mm_free(pop);

View File

@ -50,6 +50,8 @@
#include "event-internal.h"
#include "evsignal-internal.h"
#include "event2/thread.h"
#include "evthread-internal.h"
#include "log-internal.h"
#include "evmap-internal.h"
@ -67,6 +69,7 @@ typedef unsigned long fd_mask;
struct selectop {
int event_fds; /* Highest fd in fd set */
int event_fdsz;
int resize_out_sets;
fd_set *event_readset_in;
fd_set *event_writeset_in;
fd_set *event_readset_out;
@ -121,19 +124,38 @@ check_selectop(struct selectop *sop)
static int
select_dispatch(struct event_base *base, struct timeval *tv)
{
int res, i, j;
int res=0, i, j, nfds;
struct selectop *sop = base->evbase;
check_selectop(sop);
if (sop->resize_out_sets) {
fd_set *readset_out=NULL, *writeset_out=NULL;
size_t sz = sop->event_fdsz;
if (!(readset_out = mm_realloc(sop->event_readset_out, sz)))
return (-1);
if (!(writeset_out = mm_realloc(sop->event_writeset_out, sz))) {
mm_free(readset_out);
return (-1);
}
sop->event_readset_out = readset_out;
sop->event_writeset_out = writeset_out;
sop->resize_out_sets = 0;
}
memcpy(sop->event_readset_out, sop->event_readset_in,
sop->event_fdsz);
memcpy(sop->event_writeset_out, sop->event_writeset_in,
sop->event_fdsz);
res = select(sop->event_fds + 1, sop->event_readset_out,
nfds = sop->event_fds+1;
EVBASE_RELEASE_LOCK(base, EVTHREAD_WRITE, th_base_lock);
res = select(nfds, sop->event_readset_out,
sop->event_writeset_out, NULL, tv);
EVBASE_ACQUIRE_LOCK(base, EVTHREAD_WRITE, th_base_lock);
check_selectop(sop);
if (res == -1) {
@ -151,9 +173,9 @@ select_dispatch(struct event_base *base, struct timeval *tv)
event_debug(("%s: select reports %d", __func__, res));
check_selectop(sop);
i = random() % (sop->event_fds+1);
for (j = 0; j <= sop->event_fds; ++j) {
if (++i >= sop->event_fds+1)
i = random() % (nfds+1);
for (j = 0; j <= nfds; ++j) {
if (++i >= nfds+1)
i = 0;
res = 0;
if (FD_ISSET(i, sop->event_readset_out))