Make event_break threadsafe; make notify-thread mechanism a little more generic; let it use pipes where they work.

svn:r1019
This commit is contained in:
Nick Mathewson 2009-01-19 01:34:14 +00:00
parent 2b7febc80a
commit ec4cfa33c9
3 changed files with 80 additions and 18 deletions

View File

@ -146,7 +146,7 @@ AC_C_INLINE
AC_HEADER_TIME
dnl Checks for library functions.
AC_CHECK_FUNCS(gettimeofday vasprintf fcntl clock_gettime strtok_r strsep getaddrinfo getnameinfo strlcpy inet_ntop inet_pton signal sigaction strtoll inet_aton)
AC_CHECK_FUNCS(gettimeofday vasprintf fcntl clock_gettime strtok_r strsep getaddrinfo getnameinfo strlcpy inet_ntop inet_pton signal sigaction strtoll inet_aton pipe)
AC_CHECK_SIZEOF(long)

90
event.c
View File

@ -144,6 +144,10 @@ static void timeout_correct(struct event_base *, struct timeval *);
static void event_signal_closure(struct event_base *, struct event *ev);
static void event_periodic_closure(struct event_base *, struct event *ev);
static int evthread_notify_base(struct event_base *base, ev_uint8_t msg);
#define EVTHREAD_NOTIFY_MSG_RECALC 0
#define EVTHREAD_NOTIFY_MSG_LOOPBREAK 1
static void
detect_monotonic(void)
{
@ -675,7 +679,6 @@ event_base_loopexit(struct event_base *event_base, const struct timeval *tv)
event_base, tv));
}
/* not thread safe */
int
event_loopbreak(void)
{
@ -688,8 +691,12 @@ event_base_loopbreak(struct event_base *event_base)
if (event_base == NULL)
return (-1);
event_base->event_break = 1;
return (0);
if (!EVTHREAD_IN_THREAD(event_base)) {
return evthread_notify_base(event_base, EVTHREAD_NOTIFY_MSG_LOOPBREAK);
} else {
event_base->event_break = 1;
return (0);
}
}
@ -1030,6 +1037,16 @@ event_add(struct event *ev, const struct timeval *tv)
return (res);
}
static int
evthread_notify_base(struct event_base *base, ev_uint8_t msg)
{
char buf[1];
int r;
buf[0] = (char)msg;
r = send(base->th_notify_fd[1], buf, 1, 0);
return (r < 0) ? -1 : 0;
}
static inline int
event_add_internal(struct event *ev, const struct timeval *tv)
{
@ -1110,7 +1127,7 @@ event_add_internal(struct event *ev, const struct timeval *tv)
/* if we are not in the right thread, we need to wake up the loop */
if (res != -1 && !EVTHREAD_IN_THREAD(base))
send(base->th_notify_fd[1], "", 1, 0);
evthread_notify_base(base, EVTHREAD_NOTIFY_MSG_RECALC);
return (res);
}
@ -1170,8 +1187,8 @@ event_del_internal(struct event *ev)
/* if we are not in the right thread, we need to wake up the loop */
if (res != -1 && !EVTHREAD_IN_THREAD(base))
send(base->th_notify_fd[1], "", 1, 0);
evthread_notify_base(base, EVTHREAD_NOTIFY_MSG_RECALC);
return (res);
}
@ -1181,7 +1198,7 @@ event_active(struct event *ev, int res, short ncalls)
EVTHREAD_ACQUIRE_LOCK(ev->ev_base, EVTHREAD_WRITE, th_base_lock);
event_active_internal(ev, res, ncalls);
EVTHREAD_RELEASE_LOCK(ev->ev_base, EVTHREAD_WRITE, th_base_lock);
}
@ -1482,14 +1499,22 @@ evthread_set_locking_callback(struct event_base *base,
}
static void
evthread_ignore_fd(int fd, short what, void *arg)
evthread_notification_callback(int fd, short what, void *arg)
{
struct event_base *base = arg;
char buf[128];
unsigned char buf[128];
int n, i;
/* we're draining the socket */
while (recv(fd, buf, sizeof(buf), 0) != -1)
;
while ((n = recv(fd, (char*)buf, sizeof(buf), 0)) != -1) {
for (i=0;i<n;++i) {
if (buf[i] == EVTHREAD_NOTIFY_MSG_RECALC) {
/* ignore; this is just to make us call recalc/dispatch. */
} else if (buf[i] == EVTHREAD_NOTIFY_MSG_LOOPBREAK) {
event_base_loopbreak(base);
}
}
}
event_add(&base->th_notify, NULL);
}
@ -1508,25 +1533,56 @@ evthread_set_id_callback(struct event_base *base,
#endif
base->th_get_id = id_fn;
base->th_owner_id = (*id_fn)();
/*
* If another thread wants to add a new event, we need to notify
* the thread that owns the base to wakeup for rescheduling.
*/
if (evutil_socketpair(LOCAL_SOCKETPAIR_AF, SOCK_STREAM, 0,
base->th_notify_fd) == -1)
event_sock_err(1, -1, "%s: socketpair", __func__);
evthread_make_base_notifiable(base);
}
int
evthread_make_base_notifiable(struct event_base *base)
{
if (!base)
return -1;
if (base->th_notify_fd[0] >= 0)
return 0;
#if defined(XXX_EVENT_HAVE_PIPE)
if ((base->evsel->features & EV_FEATURE_FDS)) {
if (pipe(base->th_notify_fd) < 0)
event_warn(1, "%s: pipe", __func__);
}
if (base->th_notify_fd[0] < 0)
#endif
{
if (evutil_socketpair(LOCAL_SOCKETPAIR_AF, SOCK_STREAM, 0,
base->th_notify_fd) == -1) {
event_sock_warn(-1, "%s: socketpair", __func__);
return (-1);
}
}
evutil_make_socket_nonblocking(base->th_notify_fd[0]);
evutil_make_socket_nonblocking(base->th_notify_fd[1]);
// This can't be right, can it? We want writes to this socket to
// just succeed.
// evutil_make_socket_nonblocking(base->th_notify_fd[1]);
/* prepare an event that we can use for wakeup */
event_assign(&base->th_notify, base, base->th_notify_fd[0], EV_READ,
evthread_ignore_fd, base);
evthread_notification_callback, base);
/* we need to mark this as internal event */
base->th_notify.ev_flags |= EVLIST_INTERNAL;
/* XXX th_notify should have a very high priority. */
event_add(&base->th_notify, NULL);
return 0;
}
void

View File

@ -96,6 +96,12 @@ void evthread_set_locking_callback(struct event_base *base,
void evthread_set_id_callback(struct event_base *base,
unsigned long (*id_fn)(void));
/** Make sure it's safe to tell an event base to wake up from another thread.
@return 0 on success, -1 on failure.
*/
int evthread_make_base_notifiable(struct event_base *base);
#ifdef __cplusplus
}
#endif