First tests for IOCP loop, and related fixes.

The fixes are: a shutdown mode that works, and a way to activate an
arbitrary event_overlapped.

svn:r1254
This commit is contained in:
Nick Mathewson 2009-04-30 19:04:44 +00:00
parent b45cead762
commit f1090833b2
3 changed files with 252 additions and 19 deletions

View File

@ -26,6 +26,7 @@
#include <windows.h> #include <windows.h>
#include <process.h> #include <process.h>
#include <stdio.h>
#include "event2/util.h" #include "event2/util.h"
#include "util-internal.h" #include "util-internal.h"
@ -33,6 +34,8 @@
#include "log-internal.h" #include "log-internal.h"
#include "mm-internal.h" #include "mm-internal.h"
#define NOTIFICATION_KEY ((ULONG_PTR)-1)
void void
event_overlapped_init(struct event_overlapped *o, iocp_callback cb) event_overlapped_init(struct event_overlapped *o, iocp_callback cb)
{ {
@ -56,18 +59,30 @@ loop(void *_port)
ULONG_PTR key; ULONG_PTR key;
DWORD bytes; DWORD bytes;
long ms = port->ms; long ms = port->ms;
HANDLE p = port->port;
if (ms <= 0) if (ms <= 0)
ms = INFINITE; ms = INFINITE;
while (GetQueuedCompletionStatus(p, &bytes, &key,
while (GetQueuedCompletionStatus(port->port, &bytes, &key,
&overlapped, ms)) { &overlapped, ms)) {
if (port->shutdown) EnterCriticalSection(&port->lock);
if (port->shutdown) {
if (--port->n_live_threads == 0)
ReleaseSemaphore(port->shutdownSemaphore, 1, NULL);
LeaveCriticalSection(&port->lock);
return; return;
}
LeaveCriticalSection(&port->lock);
if (key != NOTIFICATION_KEY)
handle_entry(overlapped, key, bytes); handle_entry(overlapped, key, bytes);
} }
event_warnx("GetQueuedCompletionStatus exited with no event."); event_warnx("GetQueuedCompletionStatus exited with no event.");
EnterCriticalSection(&port->lock);
if (--port->n_live_threads == 0)
ReleaseSemaphore(port->shutdownSemaphore, 1, NULL);
LeaveCriticalSection(&port->lock);
} }
int int
@ -85,26 +100,97 @@ struct event_iocp_port *
event_iocp_port_launch(void) event_iocp_port_launch(void)
{ {
struct event_iocp_port *port; struct event_iocp_port *port;
int thread, i; int i;
if (!(port = mm_calloc(1, sizeof(struct event_iocp_port)))) if (!(port = mm_calloc(1, sizeof(struct event_iocp_port))))
return NULL; return NULL;
port->n_threads = 2; port->n_threads = 2;
port->port = CreateIoCompletionPort(NULL, NULL, 0, port->n_threads); port->threads = calloc(port->n_threads, sizeof(HANDLE));
if (!port->threads)
goto err;
port->port = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, port->n_threads);
port->ms = -1; port->ms = -1;
if (!port->port) if (!port->port)
mm_free(port); goto err;
for (i=0; i<port->n_threads; ++i) port->shutdownSemaphore = CreateSemaphore(NULL, 0, 1, NULL);
thread = _beginthread(loop, 0, port); if (!port->shutdownSemaphore)
goto err;
for (i=0; i<port->n_threads; ++i) {
uintptr_t th = _beginthread(loop, 0, port);
if (th == (uintptr_t)-1)
goto err;
port->threads[i] = (HANDLE)th;
++port->n_live_threads;
}
InitializeCriticalSection(&port->lock);
return port; return port;
err:
if (port->port)
CloseHandle(port->port);
if (port->threads)
mm_free(port->threads);
if (port->shutdownSemaphore)
CloseHandle(port->shutdownSemaphore);
mm_free(port);
return NULL;
} }
static void
void _event_iocp_port_unlock_and_free(struct event_iocp_port *port)
event_iocp_shutdown(struct event_iocp_port *port)
{ {
port->shutdown = 1; DeleteCriticalSection(&port->lock);
/* XXX notify. */ CloseHandle(port->port);
CloseHandle(port->shutdownSemaphore);
mm_free(port->threads);
mm_free(port);
}
static int
event_iocp_notify_all(struct event_iocp_port *port)
{
int i, r, ok=1;
for (i=0; i<port->n_threads; ++i) {
r = PostQueuedCompletionStatus(port->port, 0, NOTIFICATION_KEY,
NULL);
if (!r)
ok = 0;
}
return ok ? 0 : -1;
}
int
event_iocp_shutdown(struct event_iocp_port *port, long waitMsec)
{
int n;
EnterCriticalSection(&port->lock);
port->shutdown = 1;
LeaveCriticalSection(&port->lock);
event_iocp_notify_all(port);
WaitForSingleObject(port->shutdownSemaphore, waitMsec);
EnterCriticalSection(&port->lock);
n = port->n_live_threads;
LeaveCriticalSection(&port->lock);
if (n == 0) {
_event_iocp_port_unlock_and_free(port);
return 0;
} else {
return -1;
}
}
int
event_iocp_activate_overlapped(
struct event_iocp_port *port, struct event_overlapped *o,
uintptr_t key, ev_uint32_t n)
{
BOOL r;
r = PostQueuedCompletionStatus(port->port, n, key, &o->overlapped);
return (r==0) ? -1 : 0;
} }

View File

@ -60,13 +60,26 @@ struct event_overlapped {
struct event_iocp_port { struct event_iocp_port {
/** The port itself */ /** The port itself */
HANDLE port; HANDLE port;
/** Number of threads open on the port. */ /* A lock to cover internal structures. */
int n_threads; CRITICAL_SECTION lock;
/** Number of threads ever open on the port. */
short n_threads;
/** True iff we're shutting down all the threads on this port */ /** True iff we're shutting down all the threads on this port */
int shutdown; short shutdown;
/** How often the threads on this port check for shutdown and other /** How often the threads on this port check for shutdown and other
* conditions */ * conditions */
long ms; long ms;
/* The threads that are waiting for events. */
HANDLE *threads;
/** Number of threads currently open on this port. */
short n_live_threads;
/* A semaphore to signal when we are done shutting down. */
HANDLE *shutdownSemaphore;
};
#else
/* Dummy definition so we can test-compile more things on unix. */
struct event_overlapped {
iocp_callback cb;
}; };
#endif #endif
@ -120,8 +133,18 @@ struct event_iocp_port *event_iocp_port_launch(void);
int event_iocp_port_associate(struct event_iocp_port *port, evutil_socket_t fd, int event_iocp_port_associate(struct event_iocp_port *port, evutil_socket_t fd,
uintptr_t key); uintptr_t key);
/** Shut down all threads serving an iocp. */ /** Tell all threads serving an iocp to stop. Wait for up to waitMsec for all
void event_iocp_shutdown(struct event_iocp_port *port); the threads to finish whatever they're doing. If all the threads are
done, free the port and return 0. Otherwise, return -1. If you get a -1
return value, it is safe to call this function again.
*/
int event_iocp_shutdown(struct event_iocp_port *port, long waitMsec);
/* FIXME document. */
int event_iocp_activate_overlapped(struct event_iocp_port *port,
struct event_overlapped *o,
uintptr_t key, ev_uint32_t n_bytes);
#ifdef __cplusplus #ifdef __cplusplus
} }

124
test/regress_iocp.c Normal file
View File

@ -0,0 +1,124 @@
/*
* Copyright (c) 2009 Niels Provos and Nick Mathewson
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions
* are met:
* 1. Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* 2. Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in the
* documentation and/or other materials provided with the distribution.
* 3. The name of the author may not be used to endorse or promote products
* derived from this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR
* IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
* OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
* IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT,
* INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
* NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF
* THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/
#include <stdlib.h>
#include <event2/event.h>
#include <event2/thread.h>
#include "tinytest.h"
#include "tinytest_macros.h"
#include "iocp-internal.h"
#include "evthread-internal.h"
#define MAX_CALLS 16
struct dummy_overlapped {
struct event_overlapped eo;
void *lock;
int call_count;
uintptr_t keys[MAX_CALLS];
ssize_t sizes[MAX_CALLS];
};
static void
dummy_cb(struct event_overlapped *o, uintptr_t key, ssize_t n)
{
struct dummy_overlapped *d_o =
EVUTIL_UPCAST(o, struct dummy_overlapped, eo);
EVLOCK_LOCK(d_o->lock, EVTHREAD_WRITE);
if (d_o->call_count < MAX_CALLS) {
d_o->keys[d_o->call_count] = key;
d_o->sizes[d_o->call_count] = n;
}
d_o->call_count++;
EVLOCK_UNLOCK(d_o->lock, EVTHREAD_WRITE);
}
static int
pair_is_in(struct dummy_overlapped *o, uintptr_t key, ssize_t n)
{
int i;
int result = 0;
EVLOCK_LOCK(o->lock, EVTHREAD_WRITE);
for (i=0; i < o->call_count; ++i) {
if (o->keys[i] == key && o->sizes[i] == n) {
result = 1;
break;
}
}
EVLOCK_UNLOCK(o->lock, EVTHREAD_WRITE);
return result;
}
static void
test_iocp_port(void *loop)
{
struct event_iocp_port *port = NULL;
struct dummy_overlapped o1, o2;
#ifdef WIN32
evthread_use_windows_threads();
#endif
memset(&o1, 0, sizeof(o1));
memset(&o2, 0, sizeof(o2));
EVTHREAD_ALLOC_LOCK(o1.lock);
EVTHREAD_ALLOC_LOCK(o2.lock);
tt_assert(o1.lock);
tt_assert(o2.lock);
event_overlapped_init(&o1.eo, dummy_cb);
event_overlapped_init(&o2.eo, dummy_cb);
port = event_iocp_port_launch();
tt_assert(port);
tt_assert(!event_iocp_activate_overlapped(port, &o1.eo, 10, 105));
tt_assert(!event_iocp_activate_overlapped(port, &o2.eo, 25, 205));
#ifdef WIN32
/* FIXME Be smarter. */
Sleep(1000);
#endif
tt_want(!event_iocp_shutdown(port, 2000));
tt_int_op(o1.call_count, ==, 1);
tt_int_op(o2.call_count, ==, 1);
tt_want(pair_is_in(&o1, 10, 105));
tt_want(pair_is_in(&o2, 25, 205));
end:
/* FIXME free the locks. */
;
}
struct testcase_t iocp_testcases[] = {
{ "iocp_port", test_iocp_port, TT_FORK, NULL, NULL },
END_OF_TESTCASES
};