support for event priorities; active events are scheduled into priority queues;

lower priorities get always processed before higher priorities


svn:r120
This commit is contained in:
Niels Provos 2004-09-19 21:08:09 +00:00
parent 6df2ede5f5
commit fa6c304d9e
5 changed files with 239 additions and 13 deletions

View File

@ -225,6 +225,17 @@ bufferevent_new(int fd, evbuffercb readcb, evbuffercb writecb,
return (bufev);
}
int
bufferevent_priority_set(struct bufferevent *bufev, int priority)
{
if (event_priority_set(&bufev->ev_read, priority) == -1)
return (-1);
if (event_priority_set(&bufev->ev_write, priority) == -1)
return (-1);
return (0);
}
void
bufferevent_free(struct bufferevent *bufev)
{

31
event.3
View File

@ -40,6 +40,8 @@
.Nm event_once ,
.Nm event_pending ,
.Nm event_initialized ,
.Nm event_priority_init ,
.Nm event_priority_set ,
.Nm evtimer_set ,
.Nm evtimer_add ,
.Nm evtimer_del
@ -91,6 +93,10 @@
.Fn "event_pending" "struct event *ev" "short event" "struct timeval *tv"
.Ft int
.Fn "event_initialized" "struct event *ev"
.Ft int
.Fn "event_priority_init" "int npriorities"
.Ft int
.Fn "event_priority_set" "struct event *ev" "int priority"
.Ft void
.Fn "evtimer_set" "struct event *ev" "void (*fn)(int, short, void *)" "void *arg"
.Ft void
@ -368,6 +374,31 @@ By setting the environment variable
.Nm libevent
displays the kernel notification method that it uses.
.Pp
.Sh EVENT PRIORITIES
By default
.Nm libevent
schedules all active events with the same priority.
However, sometime it is desirable to process some events with a higher
priority than others.
For that reason,
.Nm libevent
supports strict priority queues.
Active events with a lower priority are always processed before events
with a higher priority.
.Pp
The number of different priorities can be set initially with the
.Fn event_priority_init
function.
This function should be called before the first call to
.Fn event_dispatch .
The
.Fn event_priority_set
function can be used to assign a priority to an event.
By default,
.Nm libevent
assigns the middle priority to all events unless their priority
is explicitly set.
.Pp
.Sh BUFFERED EVENTS
.Nm libevent
provides an abstraction on top of the regular event callbacks.

122
event.c
View File

@ -113,7 +113,8 @@ const struct eventop *eventops[] = {
const struct eventop *evsel;
void *evbase;
static int event_count;
static int event_count; /* counts number of total events */
static int event_count_active; /* counts number of active events */
/* Handle signals - This is a deprecated interface */
int (*event_sigcb)(void); /* Signal callback when gotsig is set */
@ -128,7 +129,11 @@ int event_haveevents(void);
static void event_process_active(void);
static RB_HEAD(event_tree, event) timetree;
static struct event_list activequeue;
/* active event management */
static struct event_list **activequeues;
static int nactivequeues;
struct event_list signalqueue;
struct event_list eventqueue;
static struct timeval event_tv;
@ -168,7 +173,6 @@ event_init(void)
RB_INIT(&timetree);
TAILQ_INIT(&eventqueue);
TAILQ_INIT(&activequeue);
TAILQ_INIT(&signalqueue);
evbase = NULL;
@ -183,6 +187,41 @@ event_init(void)
if (getenv("EVENT_SHOW_METHOD"))
fprintf(stderr, "libevent using: %s\n", evsel->name);
/* allocate a single active event queue */
event_priority_init(1);
}
int
event_priority_init(int npriorities)
{
int i;
if (event_count_active)
return (-1);
if (nactivequeues && npriorities != nactivequeues) {
for (i = 0; i < nactivequeues; ++i) {
free(activequeues[i]);
}
free(activequeues);
}
/* Allocate our priority queues */
nactivequeues = npriorities;
activequeues = (struct event_list **)calloc(nactivequeues,
npriorities * sizeof(struct event_list *));
if (activequeues == NULL)
err(1, "%s: calloc", __func__);
for (i = 0; i < nactivequeues; ++i) {
activequeues[i] = malloc(sizeof(struct event_list));
if (activequeues[i] == NULL)
err(1, "%s: malloc", __func__);
TAILQ_INIT(activequeues[i]);
}
return (0);
}
int
@ -191,14 +230,31 @@ event_haveevents(void)
return (event_count > 0);
}
/*
* Active events are stored in priority queues. Lower priorities are always
* process before higher priorities. Low priority events can starve high
* priority ones.
*/
static void
event_process_active(void)
{
struct event *ev;
struct event_list *activeq = NULL;
int i;
short ncalls;
for (ev = TAILQ_FIRST(&activequeue); ev;
ev = TAILQ_FIRST(&activequeue)) {
if (!event_count_active)
return;
for (i = 0; i < nactivequeues; ++i) {
if (TAILQ_FIRST(activequeues[i]) != NULL) {
activeq = activequeues[i];
break;
}
}
for (ev = TAILQ_FIRST(activeq); ev; ev = TAILQ_FIRST(activeq)) {
event_queue_remove(ev, EVLIST_ACTIVE);
/* Allows deletes to work */
@ -276,7 +332,7 @@ event_loop(int flags)
}
event_tv = tv;
if (!(flags & EVLOOP_NONBLOCK))
if (!event_count_active && !(flags & EVLOOP_NONBLOCK))
timeout_next(&tv);
else
timerclear(&tv);
@ -292,9 +348,9 @@ event_loop(int flags)
timeout_process();
if (TAILQ_FIRST(&activequeue)) {
if (event_count_active) {
event_process_active();
if (flags & EVLOOP_ONCE)
if (!event_count_active && (flags & EVLOOP_ONCE))
done = 1;
} else if (flags & EVLOOP_NONBLOCK)
done = 1;
@ -382,6 +438,27 @@ event_set(struct event *ev, int fd, short events,
ev->ev_flags = EVLIST_INIT;
ev->ev_ncalls = 0;
ev->ev_pncalls = NULL;
/* by default, we put new events into the middle priority */
ev->ev_pri = nactivequeues/2;
}
/*
* Set's the priority of an event - if an event is already scheduled
* changing the priority is going to fail.
*/
int
event_priority_set(struct event *ev, int pri)
{
if (ev->ev_flags & EVLIST_ACTIVE)
return (-1);
if (pri < 0 || pri >= nactivequeues)
return (-1);
ev->ev_pri = pri;
return (0);
}
/*
@ -587,17 +664,24 @@ timeout_process(void)
void
event_queue_remove(struct event *ev, int queue)
{
int docount = 1;
if (!(ev->ev_flags & queue))
errx(1, "%s: %p(fd %d) not on queue %x", __func__,
ev, ev->ev_fd, queue);
if (!(ev->ev_flags & EVLIST_INTERNAL))
if (ev->ev_flags & EVLIST_INTERNAL)
docount = 0;
if (docount)
event_count--;
ev->ev_flags &= ~queue;
switch (queue) {
case EVLIST_ACTIVE:
TAILQ_REMOVE(&activequeue, ev, ev_active_next);
if (docount)
event_count_active--;
TAILQ_REMOVE(activequeues[ev->ev_pri], ev, ev_active_next);
break;
case EVLIST_SIGNAL:
TAILQ_REMOVE(&signalqueue, ev, ev_signal_next);
@ -616,17 +700,29 @@ event_queue_remove(struct event *ev, int queue)
void
event_queue_insert(struct event *ev, int queue)
{
if (ev->ev_flags & queue)
int docount = 1;
if (ev->ev_flags & queue) {
/* Double insertion is possible for active events */
if (queue & EVLIST_ACTIVE)
return;
errx(1, "%s: %p(fd %d) already on queue %x", __func__,
ev, ev->ev_fd, queue);
}
if (!(ev->ev_flags & EVLIST_INTERNAL))
if (ev->ev_flags & EVLIST_INTERNAL)
docount = 0;
if (docount)
event_count++;
ev->ev_flags |= queue;
switch (queue) {
case EVLIST_ACTIVE:
TAILQ_INSERT_TAIL(&activequeue, ev, ev_active_next);
if (docount)
event_count_active++;
TAILQ_INSERT_TAIL(activequeues[ev->ev_pri], ev,ev_active_next);
break;
case EVLIST_SIGNAL:
TAILQ_INSERT_TAIL(&signalqueue, ev, ev_signal_next);

View File

@ -89,6 +89,8 @@ struct event {
struct timeval ev_timeout;
int ev_pri; /* smaller numbers are higher priority */
void (*ev_callback)(int, short, void *arg);
void *ev_arg;
@ -167,6 +169,11 @@ int event_pending(struct event *, short, struct timeval *);
#define event_initialized(ev) ((ev)->ev_flags & EVLIST_INIT)
#endif
/* These functions deal with event priorities */
int event_priority_init(int);
int event_priority_set(struct event *, int);
/* These functions deal with buffering input and output */
struct evbuffer {
@ -220,6 +227,7 @@ struct bufferevent {
struct bufferevent *bufferevent_new(int fd,
evbuffercb readcb, evbuffercb writecb, everrorcb errorcb, void *cbarg);
int bufferevent_priority_set(struct bufferevent *bufev, int pri);
void bufferevent_free(struct bufferevent *bufev);
int bufferevent_write(struct bufferevent *bufev, void *data, size_t size);
int bufferevent_write_buffer(struct bufferevent *bufev, struct evbuffer *buf);

View File

@ -529,6 +529,82 @@ test9(void)
cleanup_test();
}
struct test_pri_event {
struct event ev;
int count;
};
void
test_priorities_cb(int fd, short what, void *arg)
{
struct test_pri_event *pri = arg;
struct timeval tv;
if (pri->count == 3) {
event_loopexit(NULL);
return;
}
pri->count++;
timerclear(&tv);
event_add(&pri->ev, &tv);
}
void
test_priorities(int npriorities)
{
char buf[32];
struct test_pri_event one, two;
struct timeval tv;
snprintf(buf, sizeof(buf), "Priorities %d: ", npriorities);
setup_test(buf);
event_priority_init(npriorities);
memset(&one, 0, sizeof(one));
memset(&two, 0, sizeof(two));
timeout_set(&one.ev, test_priorities_cb, &one);
if (event_priority_set(&one.ev, 0) == -1) {
fprintf(stderr, "%s: failed to set priority", __func__);
exit(1);
}
timeout_set(&two.ev, test_priorities_cb, &two);
if (event_priority_set(&two.ev, npriorities - 1) == -1) {
fprintf(stderr, "%s: failed to set priority", __func__);
exit(1);
}
timerclear(&tv);
if (event_add(&one.ev, &tv) == -1)
exit(1);
if (event_add(&two.ev, &tv) == -1)
exit(1);
event_dispatch();
event_del(&one.ev);
event_del(&two.ev);
if (npriorities == 1) {
if (one.count == 3 && two.count == 3)
test_ok = 1;
} else if (npriorities == 2) {
/* Two is called once because event_loopexit is priority 1 */
if (one.count == 3 && two.count == 1)
test_ok = 1;
} else {
if (one.count == 3 && two.count == 0)
test_ok = 1;
}
cleanup_test();
}
int
main (int argc, char **argv)
{
@ -565,6 +641,10 @@ main (int argc, char **argv)
test9();
test_priorities(1);
test_priorities(2);
test_priorities(3);
return (0);
}