diff --git a/kqueue.c b/kqueue.c index ca557bf5..c5d3b421 100644 --- a/kqueue.c +++ b/kqueue.c @@ -60,15 +60,14 @@ #include "evmap-internal.h" #include "event2/thread.h" #include "evthread-internal.h" +#include "changelist-internal.h" #define NEVENT 64 struct kqop { struct kevent *changes; - int nchanges; int changes_size; struct kevent *pend_changes; - int n_pend_changes; int pend_changes_size; struct kevent *events; @@ -80,24 +79,21 @@ struct kqop { static void kqop_free(struct kqop *kqop); static void *kq_init (struct event_base *); -static int kq_add (struct event_base *, int, short, short, void *); -static int kq_del (struct event_base *, int, short, short, void *); static int kq_sig_add (struct event_base *, int, short, short, void *); static int kq_sig_del (struct event_base *, int, short, short, void *); static int kq_dispatch (struct event_base *, struct timeval *); -static int kq_insert (struct kqop *, struct kevent *); static void kq_dealloc (struct event_base *); const struct eventop kqops = { "kqueue", kq_init, - kq_add, - kq_del, + event_changelist_add, + event_changelist_del, kq_dispatch, kq_dealloc, 1 /* need reinit */, EV_FEATURE_ET|EV_FEATURE_O1|EV_FEATURE_FDS, - 0 + EVENT_CHANGELIST_FDINFO_SIZE }; static const struct eventop kqsigops = { @@ -121,7 +117,7 @@ kq_init(struct event_base *base) if (!(kqueueop = mm_calloc(1, sizeof(struct kqop)))) return (NULL); - /* Initialize the kernel queue */ +/* Initialize the kernel queue */ if ((kq = kqueue()) == -1) { event_warn("kqueue"); @@ -174,42 +170,34 @@ err: return (NULL); } -static int -kq_insert(struct kqop *kqop, struct kevent *kev) -{ - int size = kqop->changes_size; - - if (kqop->nchanges == size) { - struct kevent *newchange; - - size *= 2; - - newchange = mm_realloc(kqop->changes, - size * sizeof(struct kevent)); - if (newchange == NULL) { - event_warn("%s: malloc", __func__); - return (-1); - } - kqop->changes = newchange; - kqop->changes_size = size; - } - - memcpy(&kqop->changes[kqop->nchanges++], kev, sizeof(struct kevent)); - - event_debug(("%s: fd %d %s%s", - __func__, (int)kev->ident, - kev->filter == EVFILT_READ ? "EVFILT_READ" : "EVFILT_WRITE", - kev->flags == EV_DELETE ? " (del)" : "")); - - return (0); -} - static void kq_sighandler(int sig) { /* Do nothing here */ } +static void +kq_setup_kevent(struct kevent *out, evutil_socket_t fd, int filter, short change) +{ + memset(out, 0, sizeof(out)); + out->ident = fd; + out->filter = filter; + + if (change & EV_CHANGE_ADD) { + out->flags = EV_ADD; + if (change & EV_ET) + out->flags |= EV_CLEAR; +#ifdef NOTE_EOF + /* Make it behave like select() and poll() */ + if (filter == EVFILT_READ) + out->fflags = NOTE_EOF; +#endif + } else { + EVUTIL_ASSERT(change & EV_CHANGE_DEL); + out->flags = EV_DELETE; + } +} + #define SWAP(tp,a,b) \ do { \ tp tmp_swap_var = (a); \ @@ -217,36 +205,79 @@ kq_sighandler(int sig) b = tmp_swap_var; \ } while (0); + +static int +kq_build_changes_list(const struct event_changelist *changelist, + struct kqop *kqop) +{ + int i; + int n_changes = 0; + + for (i = 0; i < changelist->n_changes; ++i) { + struct event_change *in_ch = &changelist->changes[i]; + struct kevent *out_ch; + if (n_changes >= kqop->changes_size - 1) { + int newsize = kqop->changes_size * 2; + struct kevent *newchanges; + + newchanges = mm_realloc(kqop->changes, + newsize * sizeof(struct kevent)); + if (newchanges == NULL) { + event_warn("%s: realloc", __func__); + return (-1); + } + kqop->changes = newchanges; + kqop->changes_size = newsize; + } + if (in_ch->read_change) { + out_ch = &kqop->changes[n_changes++]; + kq_setup_kevent(out_ch, in_ch->fd, EVFILT_READ, + in_ch->read_change); + } + if (in_ch->write_change) { + out_ch = &kqop->changes[n_changes++]; + kq_setup_kevent(out_ch, in_ch->fd, EVFILT_WRITE, + in_ch->write_change); + } + } + return n_changes; +} + static int kq_dispatch(struct event_base *base, struct timeval *tv) { struct kqop *kqop = base->evbase; struct kevent *events = kqop->events; struct timespec ts, *ts_p = NULL; - int i, res; + int i, n_changes, res; if (tv != NULL) { TIMEVAL_TO_TIMESPEC(tv, &ts); ts_p = &ts; } + /* Build "changes" from "base->changes" */ + n_changes = kq_build_changes_list(&base->changelist, kqop); + if (n_changes < 0) + return -1; + + event_changelist_remove_all(&base->changelist, base); + /* We can't hold the lock while we're calling kqueue, so another * thread might potentially mess with changes before the kernel has a * chance to read it. Therefore, we need to keep the change list * we're looking at in pend_changes, and let other threads mess with * changes. */ SWAP(struct kevent *, kqop->changes, kqop->pend_changes); - SWAP(int, kqop->nchanges, kqop->n_pend_changes); SWAP(int, kqop->changes_size, kqop->pend_changes_size); EVBASE_RELEASE_LOCK(base, th_base_lock); - res = kevent(kqop->kq, kqop->pend_changes, kqop->n_pend_changes, + res = kevent(kqop->kq, kqop->pend_changes, n_changes, events, kqop->events_size, ts_p); EVBASE_ACQUIRE_LOCK(base, th_base_lock); - kqop->n_pend_changes = 0; if (res == -1) { if (errno != EINTR) { event_warn("kevent"); @@ -317,75 +348,6 @@ kq_dispatch(struct event_base *base, struct timeval *tv) return (0); } - -static int -kq_add(struct event_base *base, int fd, short old, short events, void *p) -{ - struct kqop *kqop = base->evbase; - struct kevent kev; - (void) p; - - if (events & EV_READ) { - memset(&kev, 0, sizeof(kev)); - kev.ident = fd; - kev.filter = EVFILT_READ; -#ifdef NOTE_EOF - /* Make it behave like select() and poll() */ - kev.fflags = NOTE_EOF; -#endif - kev.flags = EV_ADD; - if (events & EV_ET) - kev.flags |= EV_CLEAR; - - if (kq_insert(kqop, &kev) == -1) - return (-1); - } - - if (events & EV_WRITE) { - memset(&kev, 0, sizeof(kev)); - kev.ident = fd; - kev.filter = EVFILT_WRITE; - kev.flags = EV_ADD; - if (events & EV_ET) - kev.flags |= EV_CLEAR; - - if (kq_insert(kqop, &kev) == -1) - return (-1); - } - - return (0); -} - -static int -kq_del(struct event_base *base, int fd, short old, short events, void *p) -{ - struct kqop *kqop = base->evbase; - struct kevent kev; - (void) p; - - if (events & EV_READ) { - memset(&kev, 0, sizeof(kev)); - kev.ident = fd; - kev.filter = EVFILT_READ; - kev.flags = EV_DELETE; - - if (kq_insert(kqop, &kev) == -1) - return (-1); - } - - if (events & EV_WRITE) { - memset(&kev, 0, sizeof(kev)); - kev.ident = fd; - kev.filter = EVFILT_WRITE; - kev.flags = EV_DELETE; - - if (kq_insert(kqop, &kev) == -1) - return (-1); - } - - return (0); -} - static void kqop_free(struct kqop *kqop) {