Make kqueue use changelists.

This fixes a bug in kqueue identified by Charles Kerr and various
Transmission users, where adding and deleting an event in succession
would make the event get reported, even if we didn't actually want to
see it.

Of course, this also makes the array of changes passed to kevent
smaller, which could help performance.
This commit is contained in:
Nick Mathewson 2010-01-14 16:31:05 -05:00
parent 27308aae4d
commit 45e5ae3717

186
kqueue.c
View File

@ -60,15 +60,14 @@
#include "evmap-internal.h"
#include "event2/thread.h"
#include "evthread-internal.h"
#include "changelist-internal.h"
#define NEVENT 64
struct kqop {
struct kevent *changes;
int nchanges;
int changes_size;
struct kevent *pend_changes;
int n_pend_changes;
int pend_changes_size;
struct kevent *events;
@ -80,24 +79,21 @@ struct kqop {
static void kqop_free(struct kqop *kqop);
static void *kq_init (struct event_base *);
static int kq_add (struct event_base *, int, short, short, void *);
static int kq_del (struct event_base *, int, short, short, void *);
static int kq_sig_add (struct event_base *, int, short, short, void *);
static int kq_sig_del (struct event_base *, int, short, short, void *);
static int kq_dispatch (struct event_base *, struct timeval *);
static int kq_insert (struct kqop *, struct kevent *);
static void kq_dealloc (struct event_base *);
const struct eventop kqops = {
"kqueue",
kq_init,
kq_add,
kq_del,
event_changelist_add,
event_changelist_del,
kq_dispatch,
kq_dealloc,
1 /* need reinit */,
EV_FEATURE_ET|EV_FEATURE_O1|EV_FEATURE_FDS,
0
EVENT_CHANGELIST_FDINFO_SIZE
};
static const struct eventop kqsigops = {
@ -121,7 +117,7 @@ kq_init(struct event_base *base)
if (!(kqueueop = mm_calloc(1, sizeof(struct kqop))))
return (NULL);
/* Initialize the kernel queue */
/* Initialize the kernel queue */
if ((kq = kqueue()) == -1) {
event_warn("kqueue");
@ -174,42 +170,34 @@ err:
return (NULL);
}
static int
kq_insert(struct kqop *kqop, struct kevent *kev)
{
int size = kqop->changes_size;
if (kqop->nchanges == size) {
struct kevent *newchange;
size *= 2;
newchange = mm_realloc(kqop->changes,
size * sizeof(struct kevent));
if (newchange == NULL) {
event_warn("%s: malloc", __func__);
return (-1);
}
kqop->changes = newchange;
kqop->changes_size = size;
}
memcpy(&kqop->changes[kqop->nchanges++], kev, sizeof(struct kevent));
event_debug(("%s: fd %d %s%s",
__func__, (int)kev->ident,
kev->filter == EVFILT_READ ? "EVFILT_READ" : "EVFILT_WRITE",
kev->flags == EV_DELETE ? " (del)" : ""));
return (0);
}
static void
kq_sighandler(int sig)
{
/* Do nothing here */
}
static void
kq_setup_kevent(struct kevent *out, evutil_socket_t fd, int filter, short change)
{
memset(out, 0, sizeof(out));
out->ident = fd;
out->filter = filter;
if (change & EV_CHANGE_ADD) {
out->flags = EV_ADD;
if (change & EV_ET)
out->flags |= EV_CLEAR;
#ifdef NOTE_EOF
/* Make it behave like select() and poll() */
if (filter == EVFILT_READ)
out->fflags = NOTE_EOF;
#endif
} else {
EVUTIL_ASSERT(change & EV_CHANGE_DEL);
out->flags = EV_DELETE;
}
}
#define SWAP(tp,a,b) \
do { \
tp tmp_swap_var = (a); \
@ -217,36 +205,79 @@ kq_sighandler(int sig)
b = tmp_swap_var; \
} while (0);
static int
kq_build_changes_list(const struct event_changelist *changelist,
struct kqop *kqop)
{
int i;
int n_changes = 0;
for (i = 0; i < changelist->n_changes; ++i) {
struct event_change *in_ch = &changelist->changes[i];
struct kevent *out_ch;
if (n_changes >= kqop->changes_size - 1) {
int newsize = kqop->changes_size * 2;
struct kevent *newchanges;
newchanges = mm_realloc(kqop->changes,
newsize * sizeof(struct kevent));
if (newchanges == NULL) {
event_warn("%s: realloc", __func__);
return (-1);
}
kqop->changes = newchanges;
kqop->changes_size = newsize;
}
if (in_ch->read_change) {
out_ch = &kqop->changes[n_changes++];
kq_setup_kevent(out_ch, in_ch->fd, EVFILT_READ,
in_ch->read_change);
}
if (in_ch->write_change) {
out_ch = &kqop->changes[n_changes++];
kq_setup_kevent(out_ch, in_ch->fd, EVFILT_WRITE,
in_ch->write_change);
}
}
return n_changes;
}
static int
kq_dispatch(struct event_base *base, struct timeval *tv)
{
struct kqop *kqop = base->evbase;
struct kevent *events = kqop->events;
struct timespec ts, *ts_p = NULL;
int i, res;
int i, n_changes, res;
if (tv != NULL) {
TIMEVAL_TO_TIMESPEC(tv, &ts);
ts_p = &ts;
}
/* Build "changes" from "base->changes" */
n_changes = kq_build_changes_list(&base->changelist, kqop);
if (n_changes < 0)
return -1;
event_changelist_remove_all(&base->changelist, base);
/* We can't hold the lock while we're calling kqueue, so another
* thread might potentially mess with changes before the kernel has a
* chance to read it. Therefore, we need to keep the change list
* we're looking at in pend_changes, and let other threads mess with
* changes. */
SWAP(struct kevent *, kqop->changes, kqop->pend_changes);
SWAP(int, kqop->nchanges, kqop->n_pend_changes);
SWAP(int, kqop->changes_size, kqop->pend_changes_size);
EVBASE_RELEASE_LOCK(base, th_base_lock);
res = kevent(kqop->kq, kqop->pend_changes, kqop->n_pend_changes,
res = kevent(kqop->kq, kqop->pend_changes, n_changes,
events, kqop->events_size, ts_p);
EVBASE_ACQUIRE_LOCK(base, th_base_lock);
kqop->n_pend_changes = 0;
if (res == -1) {
if (errno != EINTR) {
event_warn("kevent");
@ -317,75 +348,6 @@ kq_dispatch(struct event_base *base, struct timeval *tv)
return (0);
}
static int
kq_add(struct event_base *base, int fd, short old, short events, void *p)
{
struct kqop *kqop = base->evbase;
struct kevent kev;
(void) p;
if (events & EV_READ) {
memset(&kev, 0, sizeof(kev));
kev.ident = fd;
kev.filter = EVFILT_READ;
#ifdef NOTE_EOF
/* Make it behave like select() and poll() */
kev.fflags = NOTE_EOF;
#endif
kev.flags = EV_ADD;
if (events & EV_ET)
kev.flags |= EV_CLEAR;
if (kq_insert(kqop, &kev) == -1)
return (-1);
}
if (events & EV_WRITE) {
memset(&kev, 0, sizeof(kev));
kev.ident = fd;
kev.filter = EVFILT_WRITE;
kev.flags = EV_ADD;
if (events & EV_ET)
kev.flags |= EV_CLEAR;
if (kq_insert(kqop, &kev) == -1)
return (-1);
}
return (0);
}
static int
kq_del(struct event_base *base, int fd, short old, short events, void *p)
{
struct kqop *kqop = base->evbase;
struct kevent kev;
(void) p;
if (events & EV_READ) {
memset(&kev, 0, sizeof(kev));
kev.ident = fd;
kev.filter = EVFILT_READ;
kev.flags = EV_DELETE;
if (kq_insert(kqop, &kev) == -1)
return (-1);
}
if (events & EV_WRITE) {
memset(&kev, 0, sizeof(kev));
kev.ident = fd;
kev.filter = EVFILT_WRITE;
kev.flags = EV_DELETE;
if (kq_insert(kqop, &kev) == -1)
return (-1);
}
return (0);
}
static void
kqop_free(struct kqop *kqop)
{