Make threading functions global, like the mm_ functions. Use the libevent_pthread.la library in regress_pthread.

svn:r1121
This commit is contained in:
Nick Mathewson 2009-02-12 22:19:54 +00:00
parent d0a9c90e93
commit ec35eb5520
9 changed files with 162 additions and 143 deletions

View File

@ -114,17 +114,11 @@ struct event_base {
struct timeval tv_cache;
#ifndef _EVENT_DISABLE_THREAD_SUPPORT
/* threading support */
unsigned long th_owner_id;
unsigned long (*th_get_id)(void);
void *th_base_lock;
/* allocate/free locks */
void *(*th_alloc)(void);
void (*th_free)(void *lock);
/* lock or unlock a lock */
void (*th_lock)(int mode, void *lock);
#endif
/* Notify main thread to wake up break, etc. */
int th_notify_fd[2];
@ -142,6 +136,7 @@ struct event_config {
TAILQ_HEAD(event_configq, event_config_entry) entries;
enum event_method_feature require_features;
enum event_base_config_flag flags;
};
/* Internal use only: Functions that might be missing from <sys/queue.h> */
@ -170,3 +165,4 @@ int _evsig_restore_handler(struct event_base *base, int evsignal);
#endif
#endif /* _EVENT_INTERNAL_H_ */

112
event.c
View File

@ -297,6 +297,16 @@ event_base_new_with_config(struct event_config *cfg)
base->th_notify_fd[0] = -1;
base->th_notify_fd[1] = -1;
if (!cfg || !(cfg->flags & EVENT_BASE_FLAG_NOLOCK)) {
int r;
EVTHREAD_ALLOC_LOCK(base->th_base_lock);
r = evthread_make_base_notifiable(base);
if (r<0) {
event_base_free(base);
return NULL;
}
}
return (base);
}
@ -323,8 +333,7 @@ event_base_free(struct event_base *base)
base->th_notify_fd[1] = -1;
}
if (base->th_base_lock != NULL)
(*base->th_free)(base->th_base_lock);
EVTHREAD_FREE_LOCK(base->th_base_lock);
/* Delete all non-internal events. */
for (ev = TAILQ_FIRST(&base->eventqueue); ev; ) {
@ -607,7 +616,7 @@ event_process_active(struct event_base *base)
struct event_list *activeq = NULL;
int i;
EVTHREAD_ACQUIRE_LOCK(base, EVTHREAD_WRITE, th_base_lock);
EVBASE_ACQUIRE_LOCK(base, EVTHREAD_WRITE, th_base_lock);
for (i = 0; i < base->nactivequeues; ++i) {
if (TAILQ_FIRST(base->activequeues[i]) != NULL) {
@ -631,7 +640,7 @@ event_process_active(struct event_base *base)
ev->ev_res & EV_WRITE ? "EV_WRITE " : " ",
ev->ev_callback));
EVTHREAD_RELEASE_LOCK(base,
EVBASE_RELEASE_LOCK(base,
EVTHREAD_WRITE, th_base_lock);
if (ev->ev_closure != NULL)
@ -641,10 +650,10 @@ event_process_active(struct event_base *base)
(int)ev->ev_fd, ev->ev_res, ev->ev_arg);
if (event_gotsig || base->event_break)
return;
EVTHREAD_ACQUIRE_LOCK(base, EVTHREAD_WRITE, th_base_lock);
EVBASE_ACQUIRE_LOCK(base, EVTHREAD_WRITE, th_base_lock);
}
EVTHREAD_RELEASE_LOCK(base, EVTHREAD_WRITE, th_base_lock);
EVBASE_RELEASE_LOCK(base, EVTHREAD_WRITE, th_base_lock);
}
/*
@ -704,11 +713,11 @@ event_base_loopbreak(struct event_base *event_base)
if (event_base == NULL)
return (-1);
EVTHREAD_ACQUIRE_LOCK(event_base, EVTHREAD_WRITE, th_base_lock);
EVBASE_ACQUIRE_LOCK(event_base, EVTHREAD_WRITE, th_base_lock);
event_base->event_break = 1;
EVTHREAD_RELEASE_LOCK(event_base, EVTHREAD_WRITE, th_base_lock);
EVBASE_RELEASE_LOCK(event_base, EVTHREAD_WRITE, th_base_lock);
if (!EVTHREAD_IN_THREAD(event_base)) {
if (!EVBASE_IN_THREAD(event_base)) {
return evthread_notify_base(event_base);
} else {
return (0);
@ -739,6 +748,11 @@ event_base_loop(struct event_base *base, int flags)
if (base->sig.ev_signal_added)
evsig_base = base;
done = 0;
#ifndef _EVENT_DISABLE_THREAD_SUPPORT
base->th_owner_id = EVTHREAD_GET_ID();
#endif
while (!done) {
/* Terminate the loop if we have been asked to */
if (base->event_gotterm) {
@ -1038,11 +1052,11 @@ event_add(struct event *ev, const struct timeval *tv)
{
int res;
EVTHREAD_ACQUIRE_LOCK(ev->ev_base, EVTHREAD_WRITE, th_base_lock);
EVBASE_ACQUIRE_LOCK(ev->ev_base, EVTHREAD_WRITE, th_base_lock);
res = event_add_internal(ev, tv);
EVTHREAD_RELEASE_LOCK(ev->ev_base, EVTHREAD_WRITE, th_base_lock);
EVBASE_RELEASE_LOCK(ev->ev_base, EVTHREAD_WRITE, th_base_lock);
return (res);
}
@ -1169,7 +1183,7 @@ event_add_internal(struct event *ev, const struct timeval *tv)
}
/* if we are not in the right thread, we need to wake up the loop */
if (res != -1 && !EVTHREAD_IN_THREAD(base))
if (res != -1 && !EVBASE_IN_THREAD(base))
evthread_notify_base(base);
return (res);
@ -1180,11 +1194,11 @@ event_del(struct event *ev)
{
int res;
EVTHREAD_ACQUIRE_LOCK(ev->ev_base, EVTHREAD_WRITE, th_base_lock);
EVBASE_ACQUIRE_LOCK(ev->ev_base, EVTHREAD_WRITE, th_base_lock);
res = event_del_internal(ev);
EVTHREAD_RELEASE_LOCK(ev->ev_base, EVTHREAD_WRITE, th_base_lock);
EVBASE_RELEASE_LOCK(ev->ev_base, EVTHREAD_WRITE, th_base_lock);
return (res);
}
@ -1229,7 +1243,7 @@ event_del_internal(struct event *ev)
}
/* if we are not in the right thread, we need to wake up the loop */
if (res != -1 && !EVTHREAD_IN_THREAD(base))
if (res != -1 && !EVBASE_IN_THREAD(base))
evthread_notify_base(base);
return (res);
@ -1238,11 +1252,11 @@ event_del_internal(struct event *ev)
void
event_active(struct event *ev, int res, short ncalls)
{
EVTHREAD_ACQUIRE_LOCK(ev->ev_base, EVTHREAD_WRITE, th_base_lock);
EVBASE_ACQUIRE_LOCK(ev->ev_base, EVTHREAD_WRITE, th_base_lock);
event_active_internal(ev, res, ncalls);
EVTHREAD_RELEASE_LOCK(ev->ev_base, EVTHREAD_WRITE, th_base_lock);
EVBASE_RELEASE_LOCK(ev->ev_base, EVTHREAD_WRITE, th_base_lock);
}
@ -1277,7 +1291,7 @@ timeout_next(struct event_base *base, struct timeval **tv_p)
struct timeval *tv = *tv_p;
int res = 0;
EVTHREAD_ACQUIRE_LOCK(base, EVTHREAD_WRITE, th_base_lock);
EVBASE_ACQUIRE_LOCK(base, EVTHREAD_WRITE, th_base_lock);
ev = min_heap_top(&base->timeheap);
if (ev == NULL) {
@ -1303,7 +1317,7 @@ timeout_next(struct event_base *base, struct timeval **tv_p)
event_debug(("timeout_next: in %d seconds", (int)tv->tv_sec));
out:
EVTHREAD_RELEASE_LOCK(base, EVTHREAD_WRITE, th_base_lock);
EVBASE_RELEASE_LOCK(base, EVTHREAD_WRITE, th_base_lock);
return (res);
}
@ -1325,11 +1339,11 @@ timeout_correct(struct event_base *base, struct timeval *tv)
/* Check if time is running backwards */
gettime(base, tv);
EVTHREAD_ACQUIRE_LOCK(base, EVTHREAD_WRITE, th_base_lock);
EVBASE_ACQUIRE_LOCK(base, EVTHREAD_WRITE, th_base_lock);
if (evutil_timercmp(tv, &base->event_tv, >=)) {
base->event_tv = *tv;
EVTHREAD_RELEASE_LOCK(base, EVTHREAD_WRITE, th_base_lock);
EVBASE_RELEASE_LOCK(base, EVTHREAD_WRITE, th_base_lock);
return;
}
@ -1347,7 +1361,7 @@ timeout_correct(struct event_base *base, struct timeval *tv)
struct timeval *ev_tv = &(**pev).ev_timeout;
evutil_timersub(ev_tv, &off, ev_tv);
}
EVTHREAD_RELEASE_LOCK(base, EVTHREAD_WRITE, th_base_lock);
EVBASE_RELEASE_LOCK(base, EVTHREAD_WRITE, th_base_lock);
}
void
@ -1356,9 +1370,9 @@ timeout_process(struct event_base *base)
struct timeval now;
struct event *ev;
EVTHREAD_ACQUIRE_LOCK(base, EVTHREAD_WRITE, th_base_lock);
EVBASE_ACQUIRE_LOCK(base, EVTHREAD_WRITE, th_base_lock);
if (min_heap_empty(&base->timeheap)) {
EVTHREAD_RELEASE_LOCK(base, EVTHREAD_WRITE, th_base_lock);
EVBASE_RELEASE_LOCK(base, EVTHREAD_WRITE, th_base_lock);
return;
}
@ -1375,7 +1389,7 @@ timeout_process(struct event_base *base)
ev->ev_callback));
event_active_internal(ev, EV_TIMEOUT, 1);
}
EVTHREAD_RELEASE_LOCK(base, EVTHREAD_WRITE, th_base_lock);
EVBASE_RELEASE_LOCK(base, EVTHREAD_WRITE, th_base_lock);
}
void
@ -1530,15 +1544,21 @@ event_set_mem_functions(void *(*malloc_fn)(size_t sz),
_mm_free_fn = free_fn;
}
/* support for threading */
void (*_evthread_locking_fn)(int mode, void *lock) = NULL;
unsigned long (*_evthread_id_fn)(void) = NULL;
void *(*_evthread_lock_alloc_fn)(void) = NULL;
void (*_evthread_lock_free_fn)(void *) = NULL;
void
evthread_set_locking_callback(struct event_base *base,
void (*locking_fn)(int mode, void *lock))
evthread_set_locking_callback(void (*locking_fn)(int mode, void *lock))
{
#ifdef _EVENT_DISABLE_THREAD_SUPPORT
event_errx(1, "%s: not compiled with thread support", __func__);
#else
_evthread_locking_fn = locking_fn;
#endif
base->th_lock = locking_fn;
}
#if defined(_EVENT_HAVE_EVENTFD) && defined(_EVENT_HAVE_SYS_EVENTFD_H)
@ -1565,25 +1585,13 @@ evthread_notify_drain_default(evutil_socket_t fd, short what, void *arg)
}
void
evthread_set_id_callback(struct event_base *base,
unsigned long (*id_fn)(void))
evthread_set_id_callback(unsigned long (*id_fn)(void))
{
#ifdef _EVENT_DISABLE_THREAD_SUPPORT
event_errx(1, "%s: not compiled with thread support", __func__);
#endif
#ifdef WIN32
#define LOCAL_SOCKETPAIR_AF AF_INET
#else
#define LOCAL_SOCKETPAIR_AF AF_UNIX
_evthread_id_fn = id_fn;
#endif
base->th_get_id = id_fn;
base->th_owner_id = (*id_fn)();
/*
* If another thread wants to add a new event, we need to notify
* the thread that owns the base to wakeup for rescheduling.
*/
evthread_make_base_notifiable(base);
}
int
@ -1613,10 +1621,16 @@ evthread_make_base_notifiable(struct event_base *base)
}
}
if (base->th_notify_fd[0] < 0)
#endif
#ifdef WIN32
#define LOCAL_SOCKETPAIR_AF AF_INET
#else
#define LOCAL_SOCKETPAIR_AF AF_UNIX
#endif
{
if (evutil_socketpair(LOCAL_SOCKETPAIR_AF, SOCK_STREAM, 0,
base->th_notify_fd) == -1) {
base->th_notify_fd) == -1) {
event_sock_warn(-1, "%s: socketpair", __func__);
return (-1);
}
@ -1645,17 +1659,15 @@ evthread_make_base_notifiable(struct event_base *base)
}
void
evthread_set_lock_create_callbacks(struct event_base *base,
void *(*alloc_fn)(void), void (*free_fn)(void *))
evthread_set_lock_create_callbacks(void *(*alloc_fn)(void),
void (*free_fn)(void *))
{
#ifdef _EVENT_DISABLE_THREAD_SUPPORT
event_errx(1, "%s: not compiled with thread support", __func__);
#else
_evthread_lock_alloc_fn = alloc_fn;
_evthread_lock_free_fn = free_fn;
#endif
base->th_alloc = alloc_fn;
base->th_free = free_fn;
/* now, let's allocate our lock */
base->th_base_lock = (*alloc_fn)();
}
void

View File

@ -27,40 +27,58 @@
#define _EVTHREAD_INTERNAL_H_
#ifdef __cplusplus
extern "C" {
//extern "C" {
#endif
#include "event-config.h"
struct event_base;
#ifndef _EVENT_DISABLE_THREAD_SUPPORT
#define EVTHREAD_USE_LOCKS(base) \
(base != NULL && (base)->th_lock != NULL)
extern void (*_evthread_locking_fn)(int mode, void *lock);
extern unsigned long (*_evthread_id_fn)(void);
extern void *(*_evthread_lock_alloc_fn)(void);
extern void (*_evthread_lock_free_fn)(void *);
#define EVTHREAD_IN_THREAD(base) \
((base)->th_get_id == NULL || \
(base)->th_owner_id == (*(base)->th_get_id)())
#define EVBASE_USING_LOCKS(base) \
(base != NULL && (base)->th_base_lock != NULL)
#define EVTHREAD_GET_ID(base) \
(*(base)->th_get_id)()
#define EVTHREAD_GET_ID() \
(_evthread_id_fn ? _evthread_id_fn() : 1)
#define EVTHREAD_ACQUIRE_LOCK(base, mode, lock) do { \
if (EVTHREAD_USE_LOCKS(base)) \
(*(base)->th_lock)(EVTHREAD_LOCK | mode, \
(base)->lock); \
#define EVBASE_IN_THREAD(base) \
(_evthread_id_fn == NULL || \
(base)->th_owner_id == _evthread_id_fn())
#define EVTHREAD_ALLOC_LOCK(lockvar) \
((lockvar) = _evthread_lock_alloc_fn ? \
_evthread_lock_alloc_fn() : NULL)
#define EVTHREAD_FREE_LOCK(lockvar) \
do { \
if (lockvar && _evthread_lock_free_fn) \
_evthread_lock_free_fn(lockvar); \
} while (0);
#define EVBASE_ACQUIRE_LOCK(base, mode, lock) do { \
if (EVBASE_USING_LOCKS(base)) \
_evthread_locking_fn(EVTHREAD_LOCK | mode, \
(base)->lock); \
} while (0)
#define EVTHREAD_RELEASE_LOCK(base, mode, lock) do { \
if (EVTHREAD_USE_LOCKS(base)) \
(*(base)->th_lock)(EVTHREAD_UNLOCK | mode, \
(base)->lock); \
#define EVBASE_RELEASE_LOCK(base, mode, lock) do { \
if (EVBASE_USING_LOCKS(base)) \
_evthread_locking_fn(EVTHREAD_UNLOCK | mode, \
(base)->lock); \
} while (0)
#else /* _EVENT_DISABLE_THREAD_SUPPORT */
#define EVTHREAD_USE_LOCKS(base)
#define EVTHREAD_IN_THREAD(base) 1
#define EVTHREAD_GET_ID(base)
#define EVTHREAD_ACQUIRE_LOCK(base, mode, lock)
#define EVTHREAD_RELEASE_LOCK(base, mode, lock)
#define EVTHREAD_GET_ID() 1
#define EVTHREAD_ALLOC_LOCK(lockvar)
#define EVTHREAD_FREE_LOCK(lockvar)
#define EVBASE_IN_THREAD() 1
#define EVBASE_ACQUIRE_LOCK(base, mode, lock)
#define EVBASE_RELEASE_LOCK(base, mode, lock)
#endif
#ifdef __cplusplus

View File

@ -27,19 +27,27 @@
#include "event-config.h"
#endif
/* With glibc we need to define this to get PTHREAD_MUTEX_RECURSIVE. */
#define _GNU_SOURCE
#include <pthread.h>
struct event_base;
#include <event2/thread.h>
#include "mm-internal.h"
static pthread_mutexattr_t attr_recursive;
static void *
evthread_posix_lock_create(void)
{
pthread_mutex_t *lock = mm_malloc(sizeof(pthread_mutex_t));
if (!lock)
return NULL;
pthread_mutex_init(lock, NULL);
if (pthread_mutex_init(lock, &attr_recursive)) {
mm_free(lock);
return NULL;
}
return lock;
}
@ -73,12 +81,16 @@ evthread_posix_get_id(void)
}
int
evthread_use_pthreads(struct event_base *base)
evthread_use_pthreads(void)
{
evthread_set_lock_create_callbacks(base,
evthread_posix_lock_create,
evthread_posix_lock_free);
evthread_set_locking_callback(base, evthread_posix_lock);
evthread_set_id_callback(base, evthread_posix_get_id);
/* Set ourselves up to get recursive locks. */
pthread_mutexattr_init(&attr_recursive);
pthread_mutexattr_settype(&attr_recursive, PTHREAD_MUTEX_RECURSIVE);
evthread_set_lock_create_callbacks(
evthread_posix_lock_create,
evthread_posix_lock_free);
evthread_set_locking_callback(evthread_posix_lock);
evthread_set_id_callback(evthread_posix_get_id);
return -1;
}

View File

@ -32,6 +32,7 @@
#define WIN32_LEAN_AND_MEAN
#include <windows.h>
#undef WIN32_LEAN_AND_MEAN
#include <sys/locking.h>
#endif
struct event_base;
@ -73,13 +74,13 @@ evthread_win32_get_id(void)
}
int
evthread_use_windows_threads(struct event_base *base)
evthread_use_windows_threads(void)
{
evthread_set_lock_create_callbacks(base,
evthread_win32_lock_create,
evthread_win32_lock_free);
evthread_set_locking_callback(base, evthread_win32_lock);
evthread_set_id_callback(base, evthread_win32_get_id);
evthread_set_lock_create_callbacks(
evthread_win32_lock_create,
evthread_win32_lock_free);
evthread_set_locking_callback(evthread_win32_lock);
evthread_set_id_callback(evthread_win32_get_id);
return 0;
}

View File

@ -159,6 +159,10 @@ enum event_method_feature {
EV_FEATURE_FDS = 0x04,
};
enum event_base_config_flag {
EVENT_BASE_FLAG_NOLOCK = 0x01,
};
/**
Return a bitmask of the features implemented by an event base.
*/
@ -188,6 +192,11 @@ enum event_method_feature event_base_get_features(struct event_base *base);
int event_config_require_features(struct event_config *cfg,
enum event_method_feature feature);
/** Sets a flag to configure what parts of the eventual event_base will
* be initialized, and how they'll work. */
int event_config_set_flag(struct event_config *cfg,
enum event_base_config_flag flag);
/**
Initialize the event API.

View File

@ -34,6 +34,10 @@
add and delete events from a single event base, libevent needs to
lock its data structures.
Like the memory-management function hooks, all of the threading functions
_must_ be set up before an event_base is created if you want the base to
use them.
A multi-threaded application must provide locking functions to
libevent via evthread_set_locking_callback(). Libevent will invoke
this callback whenever a lock needs to be acquired or released.
@ -67,22 +71,23 @@ extern "C" {
evthread_set_locking_callback() before using libevent in a
multi-threaded application.
Locks must be recursive. That is, it must be safe for a thread to
acquire a lock that it already holds.
@param alloc_fn function to be called when allocating a new lock
@param free_fn function to be called to a free a lock
*/
void evthread_set_lock_create_callbacks(struct event_base *base,
void evthread_set_lock_create_callbacks(
void *(*alloc_fn)(void), void (*free_fn)(void *));
struct event_base;
/**
Sets the function libevent should use for locking.
@param base the event base for which the locking function should be set
@param locking_fn the function that libevent should invoke to acquire
or release a lock. mode has either EVTHREAD_LOCK or EVTHREAD_UNLOCK
set, and in addition, either EVHTREAD_WRITE or EVTREAD_READ.
*/
void evthread_set_locking_callback(struct event_base *base,
void evthread_set_locking_callback(
void (*locking_fn)(int mode, void *lock));
/**
@ -92,7 +97,7 @@ void evthread_set_locking_callback(struct event_base *base,
@param id_fn the identify function libevent should invoke to
determine the identity of a thread.
*/
void evthread_set_id_callback(struct event_base *base,
void evthread_set_id_callback(
unsigned long (*id_fn)(void));
/** Make sure it's safe to tell an event base to wake up from another thread.
@ -103,10 +108,10 @@ int evthread_make_base_notifiable(struct event_base *base);
#ifdef WIN32
/** Sets up libevent for use with Windows builtin locking and thread ID
functions. Unavailable if libevent is not build for Windows.
functions. Unavailable if libevent is not built for Windows.
@return 0 on success, -1 on failure. */
int evthread_use_windows_threads(struct event_base *base);
int evthread_use_windows_threads(void);
#endif
#ifdef _EVENT_HAVE_PTHREADS
@ -115,7 +120,7 @@ int evthread_use_windows_threads(struct event_base *base);
libraries to link against libevent_pthreads as well as libevent.
@return 0 on success, -1 on failure. */
int evthread_use_pthreads(struct event_base *base);
int evthread_use_pthreads(void);
#endif
#ifdef __cplusplus

View File

@ -25,6 +25,7 @@ regress_SOURCES = regress.c regress_buffer.c regress_http.c regress_dns.c \
$(regress_pthread_SOURCES) $(regress_zlib_SOURCES)
if PTHREADS
regress_pthread_SOURCES = regress_pthread.c
PTHREAD_LIBS += ../libevent_pthreads.la
endif
if ZLIB_REGRESS
regress_zlib_SOURCES = regress_zlib.c

View File

@ -128,55 +128,20 @@ pthread_basic(struct event_base *base)
fprintf(stdout, "OK\n");
}
static void
locking(int mode, void *lock)
{
if (mode & EVTHREAD_LOCK)
pthread_mutex_lock(lock);
else
pthread_mutex_unlock(lock);
}
static void *
alloc_lock(void)
{
pthread_mutex_t *lock = malloc(sizeof(*lock));
assert(lock != NULL);
pthread_mutex_init(lock, NULL);
return (lock);
}
static void
free_lock(void *lock)
{
pthread_mutex_destroy(lock);
free(lock);
}
static unsigned long
get_id(void)
{
union {
pthread_t thr;
unsigned long id;
} r;
r.id = 0;
r.thr = pthread_self();
return r.id;
}
void
regress_pthread(void)
{
struct event_base *base = event_base_new();
struct event_base *base;
pthread_mutex_init(&count_lock, NULL);
evthread_set_lock_create_callbacks(base, alloc_lock, free_lock);
evthread_set_locking_callback(base, locking);
evthread_set_id_callback(base, get_id);
evthread_use_pthreads();
base = event_base_new();
if (evthread_make_base_notifiable(base)<0) {
puts("Couldn't make base notifiable!");
return;
}
pthread_basic(base);