Restore our priority-inversion-prevention code with deferreds

Back when deferred_cb stuff had its own queue, the queue was always
executed, but we never ran more than 16 callbacks per iteration.
That made for two problems:

1: Because deferred_cb stuff would always run, and had no priority,
it could cause priority inversion.

2: It doesn't respect the max_dispatch_interval code.

Then, when I refactored deferred_cb to be a special case of
event_callback, that solved the above issues, but made for two more
issues:

3: Because deferred_cb stuff would always get the default priority,
it could could low-priority bufferevents to get too much priority.

4: With code like bufferevent_pair, it's easy to get into a
situation where two deferreds keep adding one another, preventing
the event loop from ever actually scanning for more events.

This commit fixes the above by giving deferreds a better notion of
priorities, and by limiting the number of deferreds that can be
added to the _current_ loop iteration's active queues.  (Extra
deferreds are put into the active_later state.)

That isn't an all-purpose priority inversion solution, of course: for
that, you may need to mess around with max_dispatch_interval.
This commit is contained in:
Nick Mathewson 2012-05-09 11:06:06 -04:00
parent 581b5beb98
commit c0e425abdc
11 changed files with 80 additions and 23 deletions

View File

@ -404,7 +404,8 @@ evbuffer_defer_callbacks(struct evbuffer *buffer, struct event_base *base)
EVBUFFER_LOCK(buffer); EVBUFFER_LOCK(buffer);
buffer->cb_queue = base; buffer->cb_queue = base;
buffer->deferred_cbs = 1; buffer->deferred_cbs = 1;
event_deferred_cb_init_(base, &buffer->deferred, event_deferred_cb_init_(&buffer->deferred,
event_base_get_npriorities(base) / 2,
evbuffer_deferred_callback, buffer); evbuffer_deferred_callback, buffer);
EVBUFFER_UNLOCK(buffer); EVBUFFER_UNLOCK(buffer);
return 0; return 0;

View File

@ -324,14 +324,14 @@ bufferevent_init_common_(struct bufferevent_private *bufev_private,
if (options & BEV_OPT_DEFER_CALLBACKS) { if (options & BEV_OPT_DEFER_CALLBACKS) {
if (options & BEV_OPT_UNLOCK_CALLBACKS) if (options & BEV_OPT_UNLOCK_CALLBACKS)
event_deferred_cb_init_( event_deferred_cb_init_(
bufev->ev_base,
&bufev_private->deferred, &bufev_private->deferred,
event_base_get_npriorities(base) / 2,
bufferevent_run_deferred_callbacks_unlocked, bufferevent_run_deferred_callbacks_unlocked,
bufev_private); bufev_private);
else else
event_deferred_cb_init_( event_deferred_cb_init_(
bufev->ev_base,
&bufev_private->deferred, &bufev_private->deferred,
event_base_get_npriorities(base) / 2,
bufferevent_run_deferred_callbacks_locked, bufferevent_run_deferred_callbacks_locked,
bufev_private); bufev_private);
} }

View File

@ -260,8 +260,9 @@ be_pair_disable(struct bufferevent *bev, short events)
if (events & EV_READ) { if (events & EV_READ) {
BEV_DEL_GENERIC_READ_TIMEOUT(bev); BEV_DEL_GENERIC_READ_TIMEOUT(bev);
} }
if (events & EV_WRITE) if (events & EV_WRITE) {
BEV_DEL_GENERIC_WRITE_TIMEOUT(bev); BEV_DEL_GENERIC_WRITE_TIMEOUT(bev);
}
return 0; return 0;
} }

View File

@ -636,6 +636,8 @@ int
bufferevent_priority_set(struct bufferevent *bufev, int priority) bufferevent_priority_set(struct bufferevent *bufev, int priority)
{ {
int r = -1; int r = -1;
struct bufferevent_private *bufev_p =
EVUTIL_UPCAST(bufev, struct bufferevent_private, bev);
BEV_LOCK(bufev); BEV_LOCK(bufev);
if (bufev->be_ops != &bufferevent_ops_socket) if (bufev->be_ops != &bufferevent_ops_socket)
@ -646,6 +648,8 @@ bufferevent_priority_set(struct bufferevent *bufev, int priority)
if (event_priority_set(&bufev->ev_write, priority) == -1) if (event_priority_set(&bufev->ev_write, priority) == -1)
goto done; goto done;
event_deferred_cb_set_priority_(&bufev_p->deferred, priority);
r = 0; r = 0;
done: done:
BEV_UNLOCK(bufev); BEV_UNLOCK(bufev);

View File

@ -42,10 +42,15 @@ typedef void (*deferred_cb_fn)(struct event_callback *, void *);
Initialize an empty, non-pending event_callback. Initialize an empty, non-pending event_callback.
@param deferred The struct event_callback structure to initialize. @param deferred The struct event_callback structure to initialize.
@param priority The priority that the callback should run at.
@param cb The function to run when the struct event_callback executes. @param cb The function to run when the struct event_callback executes.
@param arg The function's second argument. @param arg The function's second argument.
*/ */
void event_deferred_cb_init_(struct event_base *base, struct event_callback *, deferred_cb_fn, void *); void event_deferred_cb_init_(struct event_callback *, ev_uint8_t, deferred_cb_fn, void *);
/**
Change the priority of a non-pending event_callback.
*/
void event_deferred_cb_set_priority_(struct event_callback *, ev_uint8_t);
/** /**
Cancel a struct event_callback if it is currently scheduled in an event_base. Cancel a struct event_callback if it is currently scheduled in an event_base.
*/ */

View File

@ -836,8 +836,10 @@ reply_schedule_callback(struct request *const req, u32 ttl, u32 err, struct repl
d->handle = req->handle; d->handle = req->handle;
} }
event_deferred_cb_init_(req->base->event_base, event_deferred_cb_init_(
&d->deferred, reply_run_callback, &d->deferred,
event_get_priority(&req->timeout_event),
reply_run_callback,
req->user_pointer); req->user_pointer);
event_deferred_cb_schedule_( event_deferred_cb_schedule_(
req->base->event_base, req->base->event_base,

View File

@ -218,6 +218,12 @@ struct event_base {
* reentrant invocation. */ * reentrant invocation. */
int running_loop; int running_loop;
/** Set to the number of deferred_cbs we've made 'active' in the
* loop. This is a hack to prevent starvation; it would be smarter
* to just use event_config_set_max_dispatch_interval's max_callbacks
* feature */
int n_deferreds_queued;
/* Active event management. */ /* Active event management. */
/** An array of nactivequeues queues for active event_callbacks (ones /** An array of nactivequeues queues for active event_callbacks (ones
* that have triggered, and whose callbacks need to be called). Low * that have triggered, and whose callbacks need to be called). Low

36
event.c
View File

@ -1072,8 +1072,8 @@ event_config_set_max_dispatch_interval(struct event_config *cfg,
cfg->max_dispatch_interval.tv_sec = -1; cfg->max_dispatch_interval.tv_sec = -1;
cfg->max_dispatch_callbacks = cfg->max_dispatch_callbacks =
max_callbacks >= 0 ? max_callbacks : INT_MAX; max_callbacks >= 0 ? max_callbacks : INT_MAX;
if (min_priority <= 0) if (min_priority < 0)
min_priority = 1; min_priority = 0;
cfg->limit_callbacks_after_prio = min_priority; cfg->limit_callbacks_after_prio = min_priority;
return (0); return (0);
} }
@ -1683,6 +1683,7 @@ event_base_loop(struct event_base *base, int flags)
while (!done) { while (!done) {
base->event_continue = 0; base->event_continue = 0;
base->n_deferreds_queued = 0;
/* Terminate the loop if we have been asked to */ /* Terminate the loop if we have been asked to */
if (base->event_gotterm) { if (base->event_gotterm) {
@ -2593,21 +2594,28 @@ event_callback_cancel_nolock_(struct event_base *base,
case 0: case 0:
break; break;
} }
event_base_assert_ok_(base);
return 0; return 0;
} }
void void
event_deferred_cb_init_(struct event_base *base, struct event_callback *cb, deferred_cb_fn fn, void *arg) event_deferred_cb_init_(struct event_callback *cb, ev_uint8_t priority, deferred_cb_fn fn, void *arg)
{ {
if (!base)
base = current_base;
memset(cb, 0, sizeof(*cb)); memset(cb, 0, sizeof(*cb));
cb->evcb_cb_union.evcb_selfcb = fn; cb->evcb_cb_union.evcb_selfcb = fn;
cb->evcb_arg = arg; cb->evcb_arg = arg;
cb->evcb_pri = base->nactivequeues - 1; cb->evcb_pri = priority;
cb->evcb_closure = EV_CLOSURE_CB_SELF; cb->evcb_closure = EV_CLOSURE_CB_SELF;
} }
void
event_deferred_cb_set_priority_(struct event_callback *cb, ev_uint8_t priority)
{
cb->evcb_pri = priority;
}
void void
event_deferred_cb_cancel_(struct event_base *base, struct event_callback *cb) event_deferred_cb_cancel_(struct event_base *base, struct event_callback *cb)
{ {
@ -2616,12 +2624,22 @@ event_deferred_cb_cancel_(struct event_base *base, struct event_callback *cb)
event_callback_cancel_(base, cb); event_callback_cancel_(base, cb);
} }
#define MAX_DEFERREDS_QUEUED 32
int int
event_deferred_cb_schedule_(struct event_base *base, struct event_callback *cb) event_deferred_cb_schedule_(struct event_base *base, struct event_callback *cb)
{ {
int r = 1;
if (!base) if (!base)
base = current_base; base = current_base;
return event_callback_activate_(base, cb); EVBASE_ACQUIRE_LOCK(base, th_base_lock);
if (base->n_deferreds_queued > MAX_DEFERREDS_QUEUED) {
event_callback_activate_later_nolock_(base, cb);
} else {
++base->n_deferreds_queued;
r = event_callback_activate_nolock_(base, cb);
}
EVBASE_RELEASE_LOCK(base, th_base_lock);
return r;
} }
static int static int
@ -2868,6 +2886,7 @@ event_queue_insert_active(struct event_base *base, struct event_callback *evcb)
evcb->evcb_flags |= EVLIST_ACTIVE; evcb->evcb_flags |= EVLIST_ACTIVE;
base->event_count_active++; base->event_count_active++;
EVUTIL_ASSERT(evcb->evcb_pri < base->nactivequeues);
TAILQ_INSERT_TAIL(&base->activequeues[evcb->evcb_pri], TAILQ_INSERT_TAIL(&base->activequeues[evcb->evcb_pri],
evcb, evcb_active_next); evcb, evcb_active_next);
} }
@ -2884,6 +2903,7 @@ event_queue_insert_active_later(struct event_base *base, struct event_callback *
INCR_EVENT_COUNT(base, evcb->evcb_flags); INCR_EVENT_COUNT(base, evcb->evcb_flags);
evcb->evcb_flags |= EVLIST_ACTIVE_LATER; evcb->evcb_flags |= EVLIST_ACTIVE_LATER;
base->event_count_active++; base->event_count_active++;
EVUTIL_ASSERT(evcb->evcb_pri < base->nactivequeues);
TAILQ_INSERT_TAIL(&base->active_later_queue, evcb, evcb_active_next); TAILQ_INSERT_TAIL(&base->active_later_queue, evcb, evcb_active_next);
} }
@ -2920,7 +2940,9 @@ event_queue_make_later_events_active(struct event_base *base)
while ((evcb = TAILQ_FIRST(&base->active_later_queue))) { while ((evcb = TAILQ_FIRST(&base->active_later_queue))) {
TAILQ_REMOVE(&base->active_later_queue, evcb, evcb_active_next); TAILQ_REMOVE(&base->active_later_queue, evcb, evcb_active_next);
evcb->evcb_flags = (evcb->evcb_flags & ~EVLIST_ACTIVE_LATER) | EVLIST_ACTIVE; evcb->evcb_flags = (evcb->evcb_flags & ~EVLIST_ACTIVE_LATER) | EVLIST_ACTIVE;
EVUTIL_ASSERT(evcb->evcb_pri < base->nactivequeues);
TAILQ_INSERT_TAIL(&base->activequeues[evcb->evcb_pri], evcb, evcb_active_next); TAILQ_INSERT_TAIL(&base->activequeues[evcb->evcb_pri], evcb, evcb_active_next);
base->n_deferreds_queued += (evcb->evcb_closure == EV_CLOSURE_CB_SELF);
} }
} }

3
http.c
View File

@ -2197,8 +2197,9 @@ evhttp_connection_base_bufferevent_new(struct event_base *base, struct evdns_bas
bufferevent_base_set(base, evcon->bufev); bufferevent_base_set(base, evcon->bufev);
} }
event_deferred_cb_init_(evcon->base, event_deferred_cb_init_(
&evcon->read_more_deferred_cb, &evcon->read_more_deferred_cb,
bufferevent_get_priority(bev),
evhttp_deferred_read_cb, evcon); evhttp_deferred_read_cb, evcon);
evcon->dns_base = dnsbase; evcon->dns_base = dnsbase;

View File

@ -498,7 +498,8 @@ new_accepting_socket(struct evconnlistener_iocp *lev, int family)
res->family = family; res->family = family;
event_deferred_cb_init_(&res->deferred, event_deferred_cb_init_(&res->deferred,
accepted_socket_invoke_user_cb, res); event_base_get_npriorities(base) / 2,
accepted_socket_invoke_user_cb, res);
InitializeCriticalSectionAndSpinCount(&res->lock, 1000); InitializeCriticalSectionAndSpinCount(&res->lock, 1000);

View File

@ -439,7 +439,8 @@ load_deferred_queue(void *arg)
size_t i; size_t i;
for (i = 0; i < CB_COUNT; ++i) { for (i = 0; i < CB_COUNT; ++i) {
event_deferred_cb_init_(data->queue, &data->cbs[i], deferred_callback, NULL); event_deferred_cb_init_(&data->cbs[i], 0, deferred_callback,
NULL);
event_deferred_cb_schedule_(data->queue, &data->cbs[i]); event_deferred_cb_schedule_(data->queue, &data->cbs[i]);
SLEEP_MS(1); SLEEP_MS(1);
} }
@ -469,20 +470,28 @@ thread_deferred_cb_skew(void *arg)
{ {
struct basic_test_data *data = arg; struct basic_test_data *data = arg;
struct timeval tv_timer = {1, 0}; struct timeval tv_timer = {1, 0};
struct event_base *queue = data->base; struct event_base *base = NULL;
struct event_config *cfg = NULL;
struct timeval elapsed; struct timeval elapsed;
int elapsed_usec; int elapsed_usec;
int i; int i;
cfg = event_config_new();
tt_assert(cfg);
event_config_set_max_dispatch_interval(cfg, NULL, 16, 0);
base = event_base_new_with_config(cfg);
tt_assert(base);
for (i = 0; i < QUEUE_THREAD_COUNT; ++i) for (i = 0; i < QUEUE_THREAD_COUNT; ++i)
deferred_data[i].queue = queue; deferred_data[i].queue = base;
evutil_gettimeofday(&timer_start, NULL); evutil_gettimeofday(&timer_start, NULL);
event_base_once(data->base, -1, EV_TIMEOUT, timer_callback, NULL, event_base_once(base, -1, EV_TIMEOUT, timer_callback, NULL,
&tv_timer); &tv_timer);
event_base_once(data->base, -1, EV_TIMEOUT, start_threads_callback, event_base_once(base, -1, EV_TIMEOUT, start_threads_callback,
NULL, NULL); NULL, NULL);
event_base_dispatch(data->base); event_base_dispatch(base);
evutil_timersub(&timer_end, &timer_start, &elapsed); evutil_timersub(&timer_end, &timer_start, &elapsed);
TT_BLATHER(("callback count, %u", callback_count)); TT_BLATHER(("callback count, %u", callback_count));
@ -497,6 +506,10 @@ thread_deferred_cb_skew(void *arg)
end: end:
for (i = 0; i < QUEUE_THREAD_COUNT; ++i) for (i = 0; i < QUEUE_THREAD_COUNT; ++i)
THREAD_JOIN(load_threads[i]); THREAD_JOIN(load_threads[i]);
if (base)
event_base_free(base);
if (cfg)
event_config_free(cfg);
} }
static struct event time_events[5]; static struct event time_events[5];
@ -580,7 +593,8 @@ struct testcase_t thread_testcases[] = {
&basic_setup, (char*)"forking" }, &basic_setup, (char*)"forking" },
#endif #endif
TEST(conditions_simple), TEST(conditions_simple),
TEST(deferred_cb_skew), { "deferred_cb_skew", thread_deferred_cb_skew, TT_FORK|TT_NEED_THREADS,
&basic_setup, NULL },
TEST(no_events), TEST(no_events),
END_OF_TESTCASES END_OF_TESTCASES
}; };