Make "deferred callback queue" independent of event_base.

This way, we can more easily have an IOCP bufferevent implementation
that does not need an event_base at all.  Woot.

svn:r1381
This commit is contained in:
Nick Mathewson 2009-07-26 01:29:39 +00:00
parent 8cc6883524
commit b06b2649b4
7 changed files with 122 additions and 47 deletions

View File

@ -278,7 +278,7 @@ int
evbuffer_defer_callbacks(struct evbuffer *buffer, struct event_base *base) evbuffer_defer_callbacks(struct evbuffer *buffer, struct event_base *base)
{ {
EVBUFFER_LOCK(buffer, EVTHREAD_WRITE); EVBUFFER_LOCK(buffer, EVTHREAD_WRITE);
buffer->ev_base = base; buffer->cb_queue = event_base_get_deferred_cb_queue(base);
buffer->deferred_cbs = 1; buffer->deferred_cbs = 1;
event_deferred_cb_init(&buffer->deferred, event_deferred_cb_init(&buffer->deferred,
evbuffer_deferred_callback, buffer); evbuffer_deferred_callback, buffer);
@ -361,7 +361,7 @@ evbuffer_invoke_callbacks(struct evbuffer *buffer)
if (buffer->deferred.queued) if (buffer->deferred.queued)
return; return;
_evbuffer_incref(buffer); _evbuffer_incref(buffer);
event_deferred_cb_schedule(buffer->ev_base, &buffer->deferred); event_deferred_cb_schedule(buffer->cb_queue, &buffer->deferred);
} else { } else {
evbuffer_run_callbacks(buffer); evbuffer_run_callbacks(buffer);
} }
@ -407,7 +407,7 @@ _evbuffer_decref_and_unlock(struct evbuffer *buffer)
} }
evbuffer_remove_all_callbacks(buffer); evbuffer_remove_all_callbacks(buffer);
if (buffer->deferred_cbs) if (buffer->deferred_cbs)
event_deferred_cb_cancel(buffer->ev_base, &buffer->deferred); event_deferred_cb_cancel(buffer->cb_queue, &buffer->deferred);
EVBUFFER_UNLOCK(buffer, EVTHREAD_WRITE); EVBUFFER_UNLOCK(buffer, EVTHREAD_WRITE);
if (buffer->own_lock) if (buffer->own_lock)

View File

@ -140,6 +140,14 @@ bufferevent_run_deferred_callbacks(struct deferred_cb *_, void *arg)
_bufferevent_decref_and_unlock(bufev); _bufferevent_decref_and_unlock(bufev);
} }
#define SCHEDULE_DEFERRED(bevp) \
do { \
event_deferred_cb_schedule( \
event_base_get_deferred_cb_queue((bevp)->bev.ev_base), \
&(bevp)->deferred); \
} while (0);
void void
_bufferevent_run_readcb(struct bufferevent *bufev) _bufferevent_run_readcb(struct bufferevent *bufev)
{ {
@ -150,8 +158,7 @@ _bufferevent_run_readcb(struct bufferevent *bufev)
p->readcb_pending = 1; p->readcb_pending = 1;
if (!p->deferred.queued) { if (!p->deferred.queued) {
bufferevent_incref(bufev); bufferevent_incref(bufev);
event_deferred_cb_schedule( SCHEDULE_DEFERRED(p);
bufev->ev_base, &p->deferred);
} }
} else { } else {
bufev->readcb(bufev, bufev->cbarg); bufev->readcb(bufev, bufev->cbarg);
@ -168,8 +175,7 @@ _bufferevent_run_writecb(struct bufferevent *bufev)
p->writecb_pending = 1; p->writecb_pending = 1;
if (!p->deferred.queued) { if (!p->deferred.queued) {
bufferevent_incref(bufev); bufferevent_incref(bufev);
event_deferred_cb_schedule( SCHEDULE_DEFERRED(p);
bufev->ev_base, &p->deferred);
} }
} else { } else {
bufev->writecb(bufev, bufev->cbarg); bufev->writecb(bufev, bufev->cbarg);
@ -187,8 +193,7 @@ _bufferevent_run_eventcb(struct bufferevent *bufev, short what)
p->errno_pending = EVUTIL_SOCKET_ERROR(); p->errno_pending = EVUTIL_SOCKET_ERROR();
if (!p->deferred.queued) { if (!p->deferred.queued) {
bufferevent_incref(bufev); bufferevent_incref(bufev);
event_deferred_cb_schedule( SCHEDULE_DEFERRED(p);
bufev->ev_base, &p->deferred);
} }
} else { } else {
bufev->errorcb(bufev, what, bufev->cbarg); bufev->errorcb(bufev, what, bufev->cbarg);

View File

@ -50,6 +50,20 @@ struct deferred_cb {
void *arg; void *arg;
}; };
struct deferred_cb_queue {
void *lock;
int active_count;
void (*notify_fn)(struct deferred_cb_queue *, void *);
void *notify_arg;
/** Deferred callback management: a list of deferred callbacks to
* run active the active events. */
TAILQ_HEAD (deferred_cb_list, deferred_cb) deferred_cb_list;
};
/** /**
Initialize an empty, non-pending deferred_cb. Initialize an empty, non-pending deferred_cb.
@ -61,15 +75,32 @@ void event_deferred_cb_init(struct deferred_cb *, deferred_cb_fn, void *);
/** /**
Cancel a deferred_cb if it is currently scheduled in an event_base. Cancel a deferred_cb if it is currently scheduled in an event_base.
*/ */
void event_deferred_cb_cancel(struct event_base *, struct deferred_cb *); void event_deferred_cb_cancel(struct deferred_cb_queue *, struct deferred_cb *);
/** /**
Activate a deferred_cb if it is not currently scheduled in an event_base. Activate a deferred_cb if it is not currently scheduled in an event_base.
*/ */
void event_deferred_cb_schedule(struct event_base *, struct deferred_cb *); void event_deferred_cb_schedule(struct deferred_cb_queue *, struct deferred_cb *);
#define LOCK_DEFERRED_QUEUE(q) \
do { \
if ((q)->lock) \
_evthread_locking_fn(EVTHREAD_LOCK|EVTHREAD_WRITE, \
(q)->lock); \
} while (0)
#define UNLOCK_DEFERRED_QUEUE(q) \
do { \
if ((q)->lock) \
_evthread_locking_fn(EVTHREAD_UNLOCK|EVTHREAD_WRITE, \
(q)->lock); \
} while (0)
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif
void event_deferred_cb_queue_init(struct deferred_cb_queue *);
struct deferred_cb_queue *event_base_get_deferred_cb_queue(struct event_base *);
#endif /* _EVENT_INTERNAL_H_ */ #endif /* _EVENT_INTERNAL_H_ */

View File

@ -115,9 +115,8 @@ struct evbuffer {
unsigned is_overlapped : 1; unsigned is_overlapped : 1;
#endif #endif
/** An event_base associated with this evbuffer. Used to implement /** Used to implement deferred callbacks. */
* deferred callbacks. */ struct deferred_cb_queue *cb_queue;
struct event_base *ev_base;
/** For debugging: how many times have we acquired the lock for this /** For debugging: how many times have we acquired the lock for this
* evbuffer? */ * evbuffer? */

View File

@ -804,7 +804,9 @@ reply_schedule_callback(struct evdns_request *const req, u32 ttl, u32 err, struc
event_deferred_cb_init(&d->deferred, reply_run_callback, event_deferred_cb_init(&d->deferred, reply_run_callback,
req->user_pointer); req->user_pointer);
event_deferred_cb_schedule(req->base->event_base, &d->deferred); event_deferred_cb_schedule(
event_base_get_deferred_cb_queue(req->base->event_base),
&d->deferred);
} }
/* this processes a parsed reply packet */ /* this processes a parsed reply packet */

View File

@ -37,6 +37,7 @@ extern "C" {
#include "minheap-internal.h" #include "minheap-internal.h"
#include "evsignal-internal.h" #include "evsignal-internal.h"
#include "mm-internal.h" #include "mm-internal.h"
#include "defer-internal.h"
/* map union members back */ /* map union members back */
@ -136,9 +137,7 @@ struct event_base {
/** The event whose callback is executing right now */ /** The event whose callback is executing right now */
struct event *current_event; struct event *current_event;
/** Deferred callback management: a list of deferred callbacks to struct deferred_cb_queue defer_queue;
* run active the active events. */
TAILQ_HEAD (deferred_cb_list, deferred_cb) deferred_cb_list;
/** Mapping from file descriptors to enabled events */ /** Mapping from file descriptors to enabled events */
struct event_io_map io; struct event_io_map io;
@ -209,6 +208,9 @@ struct event_config {
} while (0) } while (0)
#endif /* TAILQ_FOREACH */ #endif /* TAILQ_FOREACH */
#define N_ACTIVE_CALLBACKS(base) \
((base)->event_count_active + (base)->defer_queue.active_count)
int _evsig_set_handler(struct event_base *base, int evsignal, int _evsig_set_handler(struct event_base *base, int evsignal,
void (*fn)(int)); void (*fn)(int));
int _evsig_restore_handler(struct event_base *base, int evsignal); int _evsig_restore_handler(struct event_base *base, int evsignal);

94
event.c
View File

@ -230,6 +230,27 @@ event_base_get_features(struct event_base *base)
return base->evsel->features; return base->evsel->features;
} }
void
event_deferred_cb_queue_init(struct deferred_cb_queue *cb)
{
memset(cb, 0, sizeof(struct deferred_cb_queue));
TAILQ_INIT(&cb->deferred_cb_list);
}
static void
notify_base_cbq_callback(struct deferred_cb_queue *cb, void *baseptr)
{
struct event_base *base = baseptr;
if (!EVBASE_IN_THREAD(base))
evthread_notify_base(base);
}
struct deferred_cb_queue *
event_base_get_deferred_cb_queue(struct event_base *base)
{
return base ? &base->defer_queue : NULL;
}
struct event_base * struct event_base *
event_base_new_with_config(struct event_config *cfg) event_base_new_with_config(struct event_config *cfg)
{ {
@ -245,10 +266,13 @@ event_base_new_with_config(struct event_config *cfg)
min_heap_ctor(&base->timeheap); min_heap_ctor(&base->timeheap);
TAILQ_INIT(&base->eventqueue); TAILQ_INIT(&base->eventqueue);
TAILQ_INIT(&base->deferred_cb_list);
base->sig.ev_signal_pair[0] = -1; base->sig.ev_signal_pair[0] = -1;
base->sig.ev_signal_pair[1] = -1; base->sig.ev_signal_pair[1] = -1;
event_deferred_cb_queue_init(&base->defer_queue);
base->defer_queue.notify_fn = notify_base_cbq_callback;
base->defer_queue.notify_arg = base;
evmap_io_initmap(&base->io); evmap_io_initmap(&base->io);
evmap_signal_initmap(&base->sigmap); evmap_signal_initmap(&base->sigmap);
@ -301,6 +325,7 @@ event_base_new_with_config(struct event_config *cfg)
if (!cfg || !(cfg->flags & EVENT_BASE_FLAG_NOLOCK)) { if (!cfg || !(cfg->flags & EVENT_BASE_FLAG_NOLOCK)) {
int r; int r;
EVTHREAD_ALLOC_LOCK(base->th_base_lock); EVTHREAD_ALLOC_LOCK(base->th_base_lock);
base->defer_queue.lock = base->th_base_lock;
EVTHREAD_ALLOC_LOCK(base->current_event_lock); EVTHREAD_ALLOC_LOCK(base->current_event_lock);
r = evthread_make_base_notifiable(base); r = evthread_make_base_notifiable(base);
if (r<0) { if (r<0) {
@ -551,7 +576,7 @@ event_base_priority_init(struct event_base *base, int npriorities)
{ {
int i; int i;
if (base->event_count_active || npriorities < 1 if (N_ACTIVE_CALLBACKS(base) || npriorities < 1
|| npriorities >= EVENT_MAX_PRIORITIES) || npriorities >= EVENT_MAX_PRIORITIES)
return (-1); return (-1);
@ -678,23 +703,23 @@ event_process_active_single_queue(struct event_base *base,
} }
static int static int
event_process_deferred_callbacks(struct event_base *base) event_process_deferred_callbacks(struct deferred_cb_queue *queue, int *breakptr)
{ {
int count = 0; int count = 0;
struct deferred_cb *cb; struct deferred_cb *cb;
while ((cb = TAILQ_FIRST(&base->deferred_cb_list))) { while ((cb = TAILQ_FIRST(&queue->deferred_cb_list))) {
cb->queued = 0; cb->queued = 0;
TAILQ_REMOVE(&base->deferred_cb_list, cb, cb_next); TAILQ_REMOVE(&queue->deferred_cb_list, cb, cb_next);
--base->event_count_active; --queue->active_count;
EVBASE_RELEASE_LOCK(base, EVTHREAD_WRITE, th_base_lock); UNLOCK_DEFERRED_QUEUE(queue);
cb->cb(cb, cb->arg); cb->cb(cb, cb->arg);
++count; ++count;
if (base->event_break) if (*breakptr)
return -1; return -1;
EVBASE_ACQUIRE_LOCK(base, EVTHREAD_WRITE, th_base_lock); LOCK_DEFERRED_QUEUE(queue);
} }
return count; return count;
} }
@ -727,7 +752,7 @@ event_process_active(struct event_base *base)
} }
} }
event_process_deferred_callbacks(base); event_process_deferred_callbacks(&base->defer_queue,&base->event_break);
unlock: unlock:
EVBASE_RELEASE_LOCK(base, EVTHREAD_WRITE, th_base_lock); EVBASE_RELEASE_LOCK(base, EVTHREAD_WRITE, th_base_lock);
@ -845,7 +870,7 @@ event_base_loop(struct event_base *base, int flags)
timeout_correct(base, &tv); timeout_correct(base, &tv);
tv_p = &tv; tv_p = &tv;
if (!base->event_count_active && !(flags & EVLOOP_NONBLOCK)) { if (!N_ACTIVE_CALLBACKS(base) && !(flags & EVLOOP_NONBLOCK)) {
timeout_next(base, &tv_p); timeout_next(base, &tv_p);
} else { } else {
/* /*
@ -856,7 +881,7 @@ event_base_loop(struct event_base *base, int flags)
} }
/* If we have no events, we just exit */ /* If we have no events, we just exit */
if (!event_haveevents(base) && !base->event_count_active) { if (!event_haveevents(base) && !N_ACTIVE_CALLBACKS(base)) {
event_debug(("%s: no events registered.", __func__)); event_debug(("%s: no events registered.", __func__));
return (1); return (1);
} }
@ -875,7 +900,7 @@ event_base_loop(struct event_base *base, int flags)
timeout_process(base); timeout_process(base);
if (base->event_count_active) { if (N_ACTIVE_CALLBACKS(base)) {
event_process_active(base); event_process_active(base);
if (!base->event_count_active && (flags & EVLOOP_ONCE)) if (!base->event_count_active && (flags & EVLOOP_ONCE))
done = 1; done = 1;
@ -1377,34 +1402,45 @@ event_deferred_cb_init(struct deferred_cb *cb, deferred_cb_fn fn, void *arg)
} }
void void
event_deferred_cb_cancel(struct event_base *base, struct deferred_cb *cb) event_deferred_cb_cancel(struct deferred_cb_queue *queue,
struct deferred_cb *cb)
{ {
if (!base) if (!queue) {
base = current_base; if (current_base)
queue = &current_base->defer_queue;
else
return;
}
EVBASE_ACQUIRE_LOCK(base, EVTHREAD_WRITE, th_base_lock); LOCK_DEFERRED_QUEUE(queue);
if (cb->queued) { if (cb->queued) {
TAILQ_REMOVE(&base->deferred_cb_list, cb, cb_next); TAILQ_REMOVE(&queue->deferred_cb_list, cb, cb_next);
--base->event_count_active; --queue->active_count;
cb->queued = 0; cb->queued = 0;
} }
EVBASE_RELEASE_LOCK(base, EVTHREAD_WRITE, th_base_lock); UNLOCK_DEFERRED_QUEUE(queue);
} }
void void
event_deferred_cb_schedule(struct event_base *base, struct deferred_cb *cb) event_deferred_cb_schedule(struct deferred_cb_queue *queue,
struct deferred_cb *cb)
{ {
if (!base) if (!queue) {
base = current_base; if (current_base)
EVBASE_ACQUIRE_LOCK(base, EVTHREAD_WRITE, th_base_lock); queue = &current_base->defer_queue;
else
return;
}
LOCK_DEFERRED_QUEUE(queue);
if (!cb->queued) { if (!cb->queued) {
cb->queued = 1; cb->queued = 1;
TAILQ_INSERT_TAIL(&base->deferred_cb_list, cb, cb_next); TAILQ_INSERT_TAIL(&queue->deferred_cb_list, cb, cb_next);
++base->event_count_active; ++queue->active_count;
if (!EVBASE_IN_THREAD(base)) if (queue->notify_fn)
evthread_notify_base(base); queue->notify_fn(queue, queue->notify_arg);
} }
EVBASE_RELEASE_LOCK(base, EVTHREAD_WRITE, th_base_lock); UNLOCK_DEFERRED_QUEUE(queue);
} }
static int static int