From fa6c304d9e11982058ea8fc1b6f970924c599f52 Mon Sep 17 00:00:00 2001 From: Niels Provos Date: Sun, 19 Sep 2004 21:08:09 +0000 Subject: [PATCH] support for event priorities; active events are scheduled into priority queues; lower priorities get always processed before higher priorities svn:r120 --- evbuffer.c | 11 +++++ event.3 | 31 +++++++++++++ event.c | 122 +++++++++++++++++++++++++++++++++++++++++++------ event.h | 8 ++++ test/regress.c | 80 ++++++++++++++++++++++++++++++++ 5 files changed, 239 insertions(+), 13 deletions(-) diff --git a/evbuffer.c b/evbuffer.c index 0e7481ea..b8254d86 100644 --- a/evbuffer.c +++ b/evbuffer.c @@ -225,6 +225,17 @@ bufferevent_new(int fd, evbuffercb readcb, evbuffercb writecb, return (bufev); } +int +bufferevent_priority_set(struct bufferevent *bufev, int priority) +{ + if (event_priority_set(&bufev->ev_read, priority) == -1) + return (-1); + if (event_priority_set(&bufev->ev_write, priority) == -1) + return (-1); + + return (0); +} + void bufferevent_free(struct bufferevent *bufev) { diff --git a/event.3 b/event.3 index ec4cd7c4..c263a494 100644 --- a/event.3 +++ b/event.3 @@ -40,6 +40,8 @@ .Nm event_once , .Nm event_pending , .Nm event_initialized , +.Nm event_priority_init , +.Nm event_priority_set , .Nm evtimer_set , .Nm evtimer_add , .Nm evtimer_del @@ -91,6 +93,10 @@ .Fn "event_pending" "struct event *ev" "short event" "struct timeval *tv" .Ft int .Fn "event_initialized" "struct event *ev" +.Ft int +.Fn "event_priority_init" "int npriorities" +.Ft int +.Fn "event_priority_set" "struct event *ev" "int priority" .Ft void .Fn "evtimer_set" "struct event *ev" "void (*fn)(int, short, void *)" "void *arg" .Ft void @@ -368,6 +374,31 @@ By setting the environment variable .Nm libevent displays the kernel notification method that it uses. .Pp +.Sh EVENT PRIORITIES +By default +.Nm libevent +schedules all active events with the same priority. +However, sometime it is desirable to process some events with a higher +priority than others. +For that reason, +.Nm libevent +supports strict priority queues. +Active events with a lower priority are always processed before events +with a higher priority. +.Pp +The number of different priorities can be set initially with the +.Fn event_priority_init +function. +This function should be called before the first call to +.Fn event_dispatch . +The +.Fn event_priority_set +function can be used to assign a priority to an event. +By default, +.Nm libevent +assigns the middle priority to all events unless their priority +is explicitly set. +.Pp .Sh BUFFERED EVENTS .Nm libevent provides an abstraction on top of the regular event callbacks. diff --git a/event.c b/event.c index 66cf8a6a..c62574e3 100644 --- a/event.c +++ b/event.c @@ -113,7 +113,8 @@ const struct eventop *eventops[] = { const struct eventop *evsel; void *evbase; -static int event_count; +static int event_count; /* counts number of total events */ +static int event_count_active; /* counts number of active events */ /* Handle signals - This is a deprecated interface */ int (*event_sigcb)(void); /* Signal callback when gotsig is set */ @@ -128,7 +129,11 @@ int event_haveevents(void); static void event_process_active(void); static RB_HEAD(event_tree, event) timetree; -static struct event_list activequeue; + +/* active event management */ +static struct event_list **activequeues; +static int nactivequeues; + struct event_list signalqueue; struct event_list eventqueue; static struct timeval event_tv; @@ -168,7 +173,6 @@ event_init(void) RB_INIT(&timetree); TAILQ_INIT(&eventqueue); - TAILQ_INIT(&activequeue); TAILQ_INIT(&signalqueue); evbase = NULL; @@ -183,6 +187,41 @@ event_init(void) if (getenv("EVENT_SHOW_METHOD")) fprintf(stderr, "libevent using: %s\n", evsel->name); + + /* allocate a single active event queue */ + event_priority_init(1); +} + +int +event_priority_init(int npriorities) +{ + int i; + + if (event_count_active) + return (-1); + + if (nactivequeues && npriorities != nactivequeues) { + for (i = 0; i < nactivequeues; ++i) { + free(activequeues[i]); + } + free(activequeues); + } + + /* Allocate our priority queues */ + nactivequeues = npriorities; + activequeues = (struct event_list **)calloc(nactivequeues, + npriorities * sizeof(struct event_list *)); + if (activequeues == NULL) + err(1, "%s: calloc", __func__); + + for (i = 0; i < nactivequeues; ++i) { + activequeues[i] = malloc(sizeof(struct event_list)); + if (activequeues[i] == NULL) + err(1, "%s: malloc", __func__); + TAILQ_INIT(activequeues[i]); + } + + return (0); } int @@ -191,14 +230,31 @@ event_haveevents(void) return (event_count > 0); } +/* + * Active events are stored in priority queues. Lower priorities are always + * process before higher priorities. Low priority events can starve high + * priority ones. + */ + static void event_process_active(void) { struct event *ev; + struct event_list *activeq = NULL; + int i; short ncalls; - for (ev = TAILQ_FIRST(&activequeue); ev; - ev = TAILQ_FIRST(&activequeue)) { + if (!event_count_active) + return; + + for (i = 0; i < nactivequeues; ++i) { + if (TAILQ_FIRST(activequeues[i]) != NULL) { + activeq = activequeues[i]; + break; + } + } + + for (ev = TAILQ_FIRST(activeq); ev; ev = TAILQ_FIRST(activeq)) { event_queue_remove(ev, EVLIST_ACTIVE); /* Allows deletes to work */ @@ -276,7 +332,7 @@ event_loop(int flags) } event_tv = tv; - if (!(flags & EVLOOP_NONBLOCK)) + if (!event_count_active && !(flags & EVLOOP_NONBLOCK)) timeout_next(&tv); else timerclear(&tv); @@ -292,9 +348,9 @@ event_loop(int flags) timeout_process(); - if (TAILQ_FIRST(&activequeue)) { + if (event_count_active) { event_process_active(); - if (flags & EVLOOP_ONCE) + if (!event_count_active && (flags & EVLOOP_ONCE)) done = 1; } else if (flags & EVLOOP_NONBLOCK) done = 1; @@ -382,6 +438,27 @@ event_set(struct event *ev, int fd, short events, ev->ev_flags = EVLIST_INIT; ev->ev_ncalls = 0; ev->ev_pncalls = NULL; + + /* by default, we put new events into the middle priority */ + ev->ev_pri = nactivequeues/2; +} + +/* + * Set's the priority of an event - if an event is already scheduled + * changing the priority is going to fail. + */ + +int +event_priority_set(struct event *ev, int pri) +{ + if (ev->ev_flags & EVLIST_ACTIVE) + return (-1); + if (pri < 0 || pri >= nactivequeues) + return (-1); + + ev->ev_pri = pri; + + return (0); } /* @@ -587,17 +664,24 @@ timeout_process(void) void event_queue_remove(struct event *ev, int queue) { + int docount = 1; + if (!(ev->ev_flags & queue)) errx(1, "%s: %p(fd %d) not on queue %x", __func__, ev, ev->ev_fd, queue); - if (!(ev->ev_flags & EVLIST_INTERNAL)) + if (ev->ev_flags & EVLIST_INTERNAL) + docount = 0; + + if (docount) event_count--; ev->ev_flags &= ~queue; switch (queue) { case EVLIST_ACTIVE: - TAILQ_REMOVE(&activequeue, ev, ev_active_next); + if (docount) + event_count_active--; + TAILQ_REMOVE(activequeues[ev->ev_pri], ev, ev_active_next); break; case EVLIST_SIGNAL: TAILQ_REMOVE(&signalqueue, ev, ev_signal_next); @@ -616,17 +700,29 @@ event_queue_remove(struct event *ev, int queue) void event_queue_insert(struct event *ev, int queue) { - if (ev->ev_flags & queue) + int docount = 1; + + if (ev->ev_flags & queue) { + /* Double insertion is possible for active events */ + if (queue & EVLIST_ACTIVE) + return; + errx(1, "%s: %p(fd %d) already on queue %x", __func__, ev, ev->ev_fd, queue); + } - if (!(ev->ev_flags & EVLIST_INTERNAL)) + if (ev->ev_flags & EVLIST_INTERNAL) + docount = 0; + + if (docount) event_count++; ev->ev_flags |= queue; switch (queue) { case EVLIST_ACTIVE: - TAILQ_INSERT_TAIL(&activequeue, ev, ev_active_next); + if (docount) + event_count_active++; + TAILQ_INSERT_TAIL(activequeues[ev->ev_pri], ev,ev_active_next); break; case EVLIST_SIGNAL: TAILQ_INSERT_TAIL(&signalqueue, ev, ev_signal_next); diff --git a/event.h b/event.h index 875560c6..83230bf7 100644 --- a/event.h +++ b/event.h @@ -89,6 +89,8 @@ struct event { struct timeval ev_timeout; + int ev_pri; /* smaller numbers are higher priority */ + void (*ev_callback)(int, short, void *arg); void *ev_arg; @@ -167,6 +169,11 @@ int event_pending(struct event *, short, struct timeval *); #define event_initialized(ev) ((ev)->ev_flags & EVLIST_INIT) #endif +/* These functions deal with event priorities */ + +int event_priority_init(int); +int event_priority_set(struct event *, int); + /* These functions deal with buffering input and output */ struct evbuffer { @@ -220,6 +227,7 @@ struct bufferevent { struct bufferevent *bufferevent_new(int fd, evbuffercb readcb, evbuffercb writecb, everrorcb errorcb, void *cbarg); +int bufferevent_priority_set(struct bufferevent *bufev, int pri); void bufferevent_free(struct bufferevent *bufev); int bufferevent_write(struct bufferevent *bufev, void *data, size_t size); int bufferevent_write_buffer(struct bufferevent *bufev, struct evbuffer *buf); diff --git a/test/regress.c b/test/regress.c index a6547e19..20ef0b40 100644 --- a/test/regress.c +++ b/test/regress.c @@ -529,6 +529,82 @@ test9(void) cleanup_test(); } +struct test_pri_event { + struct event ev; + int count; +}; + +void +test_priorities_cb(int fd, short what, void *arg) +{ + struct test_pri_event *pri = arg; + struct timeval tv; + + if (pri->count == 3) { + event_loopexit(NULL); + return; + } + + pri->count++; + + timerclear(&tv); + event_add(&pri->ev, &tv); +} + +void +test_priorities(int npriorities) +{ + char buf[32]; + struct test_pri_event one, two; + struct timeval tv; + + snprintf(buf, sizeof(buf), "Priorities %d: ", npriorities); + setup_test(buf); + + event_priority_init(npriorities); + + memset(&one, 0, sizeof(one)); + memset(&two, 0, sizeof(two)); + + timeout_set(&one.ev, test_priorities_cb, &one); + if (event_priority_set(&one.ev, 0) == -1) { + fprintf(stderr, "%s: failed to set priority", __func__); + exit(1); + } + + timeout_set(&two.ev, test_priorities_cb, &two); + if (event_priority_set(&two.ev, npriorities - 1) == -1) { + fprintf(stderr, "%s: failed to set priority", __func__); + exit(1); + } + + timerclear(&tv); + + if (event_add(&one.ev, &tv) == -1) + exit(1); + if (event_add(&two.ev, &tv) == -1) + exit(1); + + event_dispatch(); + + event_del(&one.ev); + event_del(&two.ev); + + if (npriorities == 1) { + if (one.count == 3 && two.count == 3) + test_ok = 1; + } else if (npriorities == 2) { + /* Two is called once because event_loopexit is priority 1 */ + if (one.count == 3 && two.count == 1) + test_ok = 1; + } else { + if (one.count == 3 && two.count == 0) + test_ok = 1; + } + + cleanup_test(); +} + int main (int argc, char **argv) { @@ -565,6 +641,10 @@ main (int argc, char **argv) test9(); + test_priorities(1); + test_priorities(2); + test_priorities(3); + return (0); }