Refactor the callback part of an event into its own event_callback type

This shouldn't have any visible effect, but it's necessary or
advisible for a few changes and cleanups I would like to make,
including:
   * Replacing the deferred queue with a type that works more as if it
     were an event.
   * Introducing a useful "activate this on the next round through the
     event loop" state for events and deferreds.
   * Adding an "on until further notice" status for events, to allow a
     saner win32-hybrid approach.
   * Eventually, making all user callbacks first-class things with
     event-like semantics.
This commit is contained in:
Nick Mathewson 2012-04-05 12:38:18 -04:00
parent 981aa0efc9
commit cba59e5325
3 changed files with 153 additions and 91 deletions

View File

@ -53,10 +53,16 @@ extern "C" {
#define ev_ncalls ev_.ev_signal.ev_ncalls
#define ev_pncalls ev_.ev_signal.ev_pncalls
/* Possible values for ev_closure in struct event. */
#define EV_CLOSURE_NONE 0
#define EV_CLOSURE_SIGNAL 1
#define EV_CLOSURE_PERSIST 2
#define ev_pri ev_evcallback.evcb_pri
#define ev_flags ev_evcallback.evcb_flags
#define ev_closure ev_evcallback.evcb_closure
#define ev_callback ev_evcallback.evcb_callback
#define ev_arg ev_evcallback.evcb_arg
/* Possible values for evcb_closure in struct event_callback */
#define EV_CLOSURE_EVENT 0
#define EV_CLOSURE_EVENT_SIGNAL 1
#define EV_CLOSURE_EVENT_PERSIST 2
/** Structure to define the backend of a given event_base. */
struct eventop {
@ -170,6 +176,8 @@ extern int event_debug_mode_on_;
#define EVENT_DEBUG_MODE_IS_ON() (0)
#endif
TAILQ_HEAD(evcallback_list, event_callback);
struct event_base {
/** Function pointers and other data to describe this event_base's
* backend. */
@ -210,11 +218,11 @@ struct event_base {
int running_loop;
/* Active event management. */
/** An array of nactivequeues queues for active events (ones that
* have triggered, and whose callbacks need to be called). Low
/** An array of nactivequeues queues for active event_callbacks (ones
* that have triggered, and whose callbacks need to be called). Low
* priority numbers are more important, and stall higher ones.
*/
struct event_list *activequeues;
struct evcallback_list *activequeues;
/** The length of the activequeues array */
int nactivequeues;
@ -266,7 +274,7 @@ struct event_base {
int current_event_waiters;
#endif
/** The event whose callback is executing right now */
struct event *current_event;
struct event_callback *current_event;
#ifdef _WIN32
/** IOCP support structure, if IOCP is enabled. */
@ -355,6 +363,8 @@ int evsig_restore_handler_(struct event_base *base, int evsignal);
void event_active_nolock_(struct event *ev, int res, short count);
void event_callback_activate_nolock_(struct event_base *, struct event_callback *);
/* FIXME document. */
void event_base_add_virtual_(struct event_base *base);

199
event.c
View File

@ -135,10 +135,10 @@ static inline int event_add_internal(struct event *ev,
const struct timeval *tv, int tv_is_absolute);
static inline int event_del_internal(struct event *ev);
static void event_queue_insert_active(struct event_base *, struct event *);
static void event_queue_insert_active(struct event_base *, struct event_callback *);
static void event_queue_insert_timeout(struct event_base *, struct event *);
static void event_queue_insert_inserted(struct event_base *, struct event *);
static void event_queue_remove_active(struct event_base *, struct event *);
static void event_queue_remove_active(struct event_base *, struct event_callback *);
static void event_queue_remove_timeout(struct event_base *, struct event *);
static void event_queue_remove_inserted(struct event_base *, struct event *);
#ifdef USE_REINSERT_TIMEOUT
@ -424,6 +424,19 @@ event_base_update_cache_time(struct event_base *base)
return 0;
}
static inline struct event *
event_callback_to_event(struct event_callback *evcb)
{
EVUTIL_ASSERT((evcb->evcb_flags & EVLIST_INIT));
return EVUTIL_UPCAST(evcb, struct event, ev_evcallback);
}
static inline struct event_callback *
event_to_event_callback(struct event *ev)
{
return &ev->ev_evcallback;
}
struct event_base *
event_init(void)
{
@ -777,13 +790,19 @@ event_base_free(struct event_base *base)
mm_free(base->common_timeout_queues);
for (i = 0; i < base->nactivequeues; ++i) {
for (ev = TAILQ_FIRST(&base->activequeues[i]); ev; ) {
struct event *next = TAILQ_NEXT(ev, ev_active_next);
if (!(ev->ev_flags & EVLIST_INTERNAL)) {
event_del(ev);
++n_deleted;
struct event_callback *evcb, *next;
for (evcb = TAILQ_FIRST(&base->activequeues[i]); evcb; ) {
next = TAILQ_NEXT(evcb, evcb_active_next);
if (evcb->evcb_flags & EVLIST_INIT) {
ev = event_callback_to_event(evcb);
if (!(ev->ev_flags & EVLIST_INTERNAL)) {
event_del(ev);
++n_deleted;
}
} else {
event_queue_remove_active(base, evcb);
}
ev = next;
evcb = next;
}
}
@ -1093,8 +1112,8 @@ event_base_priority_init(struct event_base *base, int npriorities)
}
/* Allocate our priority queues */
base->activequeues = (struct event_list *)
mm_calloc(npriorities, sizeof(struct event_list));
base->activequeues = (struct evcallback_list *)
mm_calloc(npriorities, sizeof(struct evcallback_list));
if (base->activequeues == NULL) {
event_warn("%s: calloc", __func__);
goto err;
@ -1393,51 +1412,63 @@ event_persist_closure(struct event_base *base, struct event *ev)
releasing the lock as we go. This function requires that the lock be held
when it's invoked. Returns -1 if we get a signal or an event_break that
means we should stop processing any active events now. Otherwise returns
the number of non-internal events that we processed.
the number of non-internal event_callbacks that we processed.
*/
static int
event_process_active_single_queue(struct event_base *base,
struct event_list *activeq,
struct evcallback_list *activeq,
int max_to_process, const struct timeval *endtime)
{
struct event *ev;
struct event_callback *evcb;
int count = 0;
EVUTIL_ASSERT(activeq != NULL);
for (ev = TAILQ_FIRST(activeq); ev; ev = TAILQ_FIRST(activeq)) {
if (ev->ev_events & EV_PERSIST)
event_queue_remove_active(base, ev);
else
event_del_internal(ev);
if (!(ev->ev_flags & EVLIST_INTERNAL))
for (evcb = TAILQ_FIRST(activeq); evcb; evcb = TAILQ_FIRST(activeq)) {
struct event *ev=NULL;
if (evcb->evcb_flags & EVLIST_INIT) {
ev = event_callback_to_event(evcb);
if (ev->ev_events & EV_PERSIST)
event_queue_remove_active(base, evcb);
else
event_del_internal(ev);
event_debug((
"event_process_active: event: %p, %s%scall %p",
ev,
ev->ev_res & EV_READ ? "EV_READ " : " ",
ev->ev_res & EV_WRITE ? "EV_WRITE " : " ",
ev->ev_callback));
} else {
event_queue_remove_active(base, evcb);
event_debug(("event_process_active: event_callback %p, "
"closure %d, call %p",
evcb, evcb->evcb_closure, evcb->evcb_callback));
}
if (!(evcb->evcb_flags & EVLIST_INTERNAL))
++count;
event_debug((
"event_process_active: event: %p, %s%scall %p",
ev,
ev->ev_res & EV_READ ? "EV_READ " : " ",
ev->ev_res & EV_WRITE ? "EV_WRITE " : " ",
ev->ev_callback));
base->current_event = ev;
base->current_event = evcb;
#ifndef EVENT__DISABLE_THREAD_SUPPORT
base->current_event_waiters = 0;
#endif
switch (ev->ev_closure) {
case EV_CLOSURE_SIGNAL:
switch (evcb->evcb_closure) {
case EV_CLOSURE_EVENT_SIGNAL:
event_signal_closure(base, ev);
break;
case EV_CLOSURE_PERSIST:
case EV_CLOSURE_EVENT_PERSIST:
event_persist_closure(base, ev);
break;
default:
case EV_CLOSURE_NONE:
case EV_CLOSURE_EVENT:
EVBASE_RELEASE_LOCK(base, th_base_lock);
(*ev->ev_callback)(
ev->ev_fd, ev->ev_res, ev->ev_arg);
break;
default:
EVUTIL_ASSERT(0);
}
EVBASE_ACQUIRE_LOCK(base, th_base_lock);
@ -1517,7 +1548,7 @@ static int
event_process_active(struct event_base *base)
{
/* Caller must hold th_base_lock */
struct event_list *activeq = NULL;
struct evcallback_list *activeq = NULL;
int i, c = 0;
const struct timeval *endtime;
struct timeval tv;
@ -1865,13 +1896,13 @@ event_assign(struct event *ev, struct event_base *base, evutil_socket_t fd, shor
"EV_READ or EV_WRITE", __func__);
return -1;
}
ev->ev_closure = EV_CLOSURE_SIGNAL;
ev->ev_closure = EV_CLOSURE_EVENT_SIGNAL;
} else {
if (events & EV_PERSIST) {
evutil_timerclear(&ev->ev_io_timeout);
ev->ev_closure = EV_CLOSURE_PERSIST;
ev->ev_closure = EV_CLOSURE_EVENT_PERSIST;
} else {
ev->ev_closure = EV_CLOSURE_NONE;
ev->ev_closure = EV_CLOSURE_EVENT;
}
}
@ -1922,8 +1953,11 @@ event_base_get_running_event(struct event_base *base)
{
struct event *ev = NULL;
EVBASE_ACQUIRE_LOCK(base, th_base_lock);
if (EVBASE_IN_THREAD(base))
ev = base->current_event;
if (EVBASE_IN_THREAD(base)) {
struct event_callback *evcb = base->current_event;
if (evcb->evcb_flags & EVLIST_INIT)
ev = event_callback_to_event(evcb);
}
EVBASE_RELEASE_LOCK(base, th_base_lock);
return ev;
}
@ -2192,7 +2226,8 @@ event_add_internal(struct event *ev, const struct timeval *tv,
* until the callback is done before we mess with the event, or else
* we can race on ev_ncalls and ev_pncalls below. */
#ifndef EVENT__DISABLE_THREAD_SUPPORT
if (base->current_event == ev && (ev->ev_events & EV_SIGNAL)
if (base->current_event == event_to_event_callback(ev) &&
(ev->ev_events & EV_SIGNAL)
&& !EVBASE_IN_THREAD(base)) {
++base->current_event_waiters;
EVTHREAD_COND_WAIT(base->current_event_cond, base->th_base_lock);
@ -2232,7 +2267,7 @@ event_add_internal(struct event *ev, const struct timeval *tv,
*
* If tv_is_absolute, this was already set.
*/
if (ev->ev_closure == EV_CLOSURE_PERSIST && !tv_is_absolute)
if (ev->ev_closure == EV_CLOSURE_EVENT_PERSIST && !tv_is_absolute)
ev->ev_io_timeout = *tv;
#ifndef USE_REINSERT_TIMEOUT
@ -2256,7 +2291,7 @@ event_add_internal(struct event *ev, const struct timeval *tv,
}
}
event_queue_remove_active(base, ev);
event_queue_remove_active(base, event_to_event_callback(ev));
}
gettime(base, &now);
@ -2356,7 +2391,8 @@ event_del_internal(struct event *ev)
* user-supplied argument. */
base = ev->ev_base;
#ifndef EVENT__DISABLE_THREAD_SUPPORT
if (base->current_event == ev && !EVBASE_IN_THREAD(base)) {
if (base->current_event == event_to_event_callback(ev) &&
!EVBASE_IN_THREAD(base)) {
++base->current_event_waiters;
EVTHREAD_COND_WAIT(base->current_event_cond, base->th_base_lock);
}
@ -2384,7 +2420,7 @@ event_del_internal(struct event *ev)
}
if (ev->ev_flags & EVLIST_ACTIVE)
event_queue_remove_active(base, ev);
event_queue_remove_active(base, event_to_event_callback(ev));
if (ev->ev_flags & EVLIST_INSERTED) {
event_queue_remove_inserted(base, ev);
@ -2452,7 +2488,8 @@ event_active_nolock_(struct event *ev, int res, short ncalls)
if (ev->ev_events & EV_SIGNAL) {
#ifndef EVENT__DISABLE_THREAD_SUPPORT
if (base->current_event == ev && !EVBASE_IN_THREAD(base)) {
if (base->current_event == event_to_event_callback(ev) &&
!EVBASE_IN_THREAD(base)) {
++base->current_event_waiters;
EVTHREAD_COND_WAIT(base->current_event_cond, base->th_base_lock);
}
@ -2461,7 +2498,14 @@ event_active_nolock_(struct event *ev, int res, short ncalls)
ev->ev_pncalls = NULL;
}
event_queue_insert_active(base, ev);
event_callback_activate_nolock_(base, event_to_event_callback(ev));
}
void
event_callback_activate_nolock_(struct event_base *base,
struct event_callback *evcb)
{
event_queue_insert_active(base, evcb);
if (EVBASE_NEED_NOTIFY(base))
evthread_notify_base(base);
@ -2585,13 +2629,13 @@ timeout_process(struct event_base *base)
#error "Mismatch for value of EVLIST_INTERNAL"
#endif
/* These are a fancy way to spell
if (~ev->ev_flags & EVLIST_INTERNAL)
if (flags & EVLIST_INTERNAL)
base->event_count--/++;
*/
#define DECR_EVENT_COUNT(base,ev) \
((base)->event_count -= (~((ev)->ev_flags >> 4) & 1))
#define INCR_EVENT_COUNT(base, ev) \
((base)->event_count += (~((ev)->ev_flags >> 4) & 1))
#define DECR_EVENT_COUNT(base,flags) \
((base)->event_count -= (~((flags) >> 4) & 1))
#define INCR_EVENT_COUNT(base,flags) \
((base)->event_count += (~((flags) >> 4) & 1))
static void
event_queue_remove_inserted(struct event_base *base, struct event *ev)
@ -2602,23 +2646,24 @@ event_queue_remove_inserted(struct event_base *base, struct event *ev)
ev, ev->ev_fd, EVLIST_INSERTED);
return;
}
DECR_EVENT_COUNT(base, ev);
DECR_EVENT_COUNT(base, ev->ev_flags);
ev->ev_flags &= ~EVLIST_INSERTED;
}
static void
event_queue_remove_active(struct event_base *base, struct event *ev)
event_queue_remove_active(struct event_base *base, struct event_callback *evcb)
{
EVENT_BASE_ASSERT_LOCKED(base);
if (EVUTIL_FAILURE_CHECK(!(ev->ev_flags & EVLIST_ACTIVE))) {
event_errx(1, "%s: %p(fd %d) not on queue %x", __func__,
ev, ev->ev_fd, EVLIST_ACTIVE);
if (EVUTIL_FAILURE_CHECK(!(evcb->evcb_flags & EVLIST_ACTIVE))) {
event_errx(1, "%s: %p not on queue %x", __func__,
evcb, EVLIST_ACTIVE);
return;
}
DECR_EVENT_COUNT(base, ev);
ev->ev_flags &= ~EVLIST_ACTIVE;
DECR_EVENT_COUNT(base, evcb->evcb_flags);
evcb->evcb_flags &= ~EVLIST_ACTIVE;
base->event_count_active--;
TAILQ_REMOVE(&base->activequeues[ev->ev_pri],
ev, ev_active_next);
TAILQ_REMOVE(&base->activequeues[evcb->evcb_pri],
evcb, evcb_active_next);
}
static void
event_queue_remove_timeout(struct event_base *base, struct event *ev)
@ -2629,7 +2674,7 @@ event_queue_remove_timeout(struct event_base *base, struct event *ev)
ev, ev->ev_fd, EVLIST_TIMEOUT);
return;
}
DECR_EVENT_COUNT(base, ev);
DECR_EVENT_COUNT(base, ev->ev_flags);
ev->ev_flags &= ~EVLIST_TIMEOUT;
if (is_common_timeout(&ev->ev_timeout, base)) {
@ -2725,28 +2770,28 @@ event_queue_insert_inserted(struct event_base *base, struct event *ev)
return;
}
INCR_EVENT_COUNT(base, ev);
INCR_EVENT_COUNT(base, ev->ev_flags);
ev->ev_flags |= EVLIST_INSERTED;
}
static void
event_queue_insert_active(struct event_base *base, struct event *ev)
event_queue_insert_active(struct event_base *base, struct event_callback *evcb)
{
EVENT_BASE_ASSERT_LOCKED(base);
if (ev->ev_flags & EVLIST_ACTIVE) {
if (evcb->evcb_flags & EVLIST_ACTIVE) {
/* Double insertion is possible for active events */
return;
}
INCR_EVENT_COUNT(base, ev);
INCR_EVENT_COUNT(base, evcb->evcb_flags);
ev->ev_flags |= EVLIST_ACTIVE;
evcb->evcb_flags |= EVLIST_ACTIVE;
base->event_count_active++;
TAILQ_INSERT_TAIL(&base->activequeues[ev->ev_pri],
ev,ev_active_next);
TAILQ_INSERT_TAIL(&base->activequeues[evcb->evcb_pri],
evcb, evcb_active_next);
}
static void
@ -2760,7 +2805,7 @@ event_queue_insert_timeout(struct event_base *base, struct event *ev)
return;
}
INCR_EVENT_COUNT(base, ev);
INCR_EVENT_COUNT(base, ev->ev_flags);
ev->ev_flags |= EVLIST_TIMEOUT;
@ -3034,11 +3079,15 @@ event_base_foreach_event_(struct event_base *base,
/* Finally, we deal wit all the active events that we haven't touched
* yet. */
for (i = 0; i < base->nactivequeues; ++i) {
TAILQ_FOREACH(ev, &base->activequeues[i], ev_active_next) {
if (ev->ev_flags & (EVLIST_INSERTED|EVLIST_TIMEOUT)) {
/* we already processed this one */
struct event_callback *evcb;
TAILQ_FOREACH(evcb, &base->activequeues[i], evcb_active_next) {
if ((evcb->evcb_flags & (EVLIST_INIT|EVLIST_INSERTED|EVLIST_TIMEOUT)) != EVLIST_INIT) {
/* This isn't an event (evlist_init clear), or
* we already processed it. (inserted or
* timeout set */
continue;
}
ev = event_callback_to_event(evcb);
if ((r = fn(base, ev, arg)))
return r;
}
@ -3231,11 +3280,11 @@ event_base_assert_ok_(struct event_base *base)
/* Check the active queues. */
for (i = 0; i < base->nactivequeues; ++i) {
struct event *ev;
EVUTIL_ASSERT_TAILQ_OK(&base->activequeues[i], event, ev_active_next);
TAILQ_FOREACH(ev, &base->activequeues[i], ev_active_next) {
EVUTIL_ASSERT(ev->ev_pri == i);
EVUTIL_ASSERT(ev->ev_flags & EVLIST_ACTIVE);
struct event_callback *evcb;
EVUTIL_ASSERT_TAILQ_OK(&base->activequeues[i], event_callback, evcb_active_next);
TAILQ_FOREACH(evcb, &base->activequeues[i], evcb_active_next) {
EVUTIL_ASSERT(evcb->evcb_flags & EVLIST_ACTIVE);
EVUTIL_ASSERT(evcb->evcb_pri == i);
}
}

View File

@ -93,9 +93,19 @@ struct { \
}
#endif /* !TAILQ_ENTRY */
struct event_callback {
TAILQ_ENTRY(event_callback) evcb_active_next;
short evcb_flags;
ev_uint8_t evcb_pri; /* smaller numbers are higher priority */
ev_uint8_t evcb_closure;
/* allows us to adopt for different types of events */
void (*evcb_callback)(evutil_socket_t, short, void *arg);
void *evcb_arg;
};
struct event_base;
struct event {
TAILQ_ENTRY(event) ev_active_next;
struct event_callback ev_evcallback;
/* for managing timeouts */
union {
@ -124,14 +134,7 @@ struct event {
short ev_events;
short ev_res; /* result passed to event callback */
short ev_flags;
ev_uint8_t ev_pri; /* smaller numbers are higher priority */
ev_uint8_t ev_closure;
struct timeval ev_timeout;
/* allows us to adopt for different types of events */
void (*ev_callback)(evutil_socket_t, short, void *arg);
void *ev_arg;
};
TAILQ_HEAD (event_list, event);