mirror of
https://github.com/libevent/libevent.git
synced 2025-01-09 00:56:20 +08:00
Commit ConnectEx code to get connect working with async bufferevents.
This is code by Chris Davis, with changes to get the unit tests failing less aggressively. The unit tests for this code do not completely pass yet; Chris is looking into that. If they aren't passing by the next release, I'll turn off this code. svn:r1499
This commit is contained in:
parent
6ca32df11a
commit
86db1c851b
@ -316,3 +316,14 @@ _evbuffer_overlapped_get_fd(struct evbuffer *buf)
|
||||
struct evbuffer_overlapped *buf_o = upcast_evbuffer(buf);
|
||||
return buf_o ? buf_o->fd : -1;
|
||||
}
|
||||
|
||||
void
|
||||
_evbuffer_overlapped_set_fd(struct evbuffer *buf, evutil_socket_t fd)
|
||||
{
|
||||
struct evbuffer_overlapped *buf_o = upcast_evbuffer(buf);
|
||||
EVBUFFER_LOCK(buf, EVTHREAD_WRITE);
|
||||
/* XXX is this right?, should it cancel current I/O operations? */
|
||||
if (buf_o)
|
||||
buf_o->fd = fd;
|
||||
EVBUFFER_UNLOCK(buf, EVTHREAD_WRITE);
|
||||
}
|
||||
|
@ -141,6 +141,17 @@ extern const struct bufferevent_ops bufferevent_ops_socket;
|
||||
extern const struct bufferevent_ops bufferevent_ops_filter;
|
||||
extern const struct bufferevent_ops bufferevent_ops_pair;
|
||||
|
||||
#define BEV_IS_SOCKET(bevp) ((bevp)->be_ops == &bufferevent_ops_socket)
|
||||
#define BEV_IS_FILTER(bevp) ((bevp)->be_ops == &bufferevent_ops_filter)
|
||||
#define BEV_IS_PAIR(bevp) ((bevp)->be_ops == &bufferevent_ops_pair)
|
||||
|
||||
#ifdef WIN32
|
||||
extern const struct bufferevent_ops bufferevent_ops_async;
|
||||
#define BEV_IS_ASYNC(bevp) ((bevp)->be_ops == &bufferevent_ops_async)
|
||||
#else
|
||||
#define BEV_IS_ASYNC(bevp) 0
|
||||
#endif
|
||||
|
||||
/** Initialize the shared parts of a bufferevent. */
|
||||
int bufferevent_init_common(struct bufferevent_private *, struct event_base *, const struct bufferevent_ops *, enum bufferevent_options options);
|
||||
|
||||
|
@ -45,6 +45,7 @@
|
||||
|
||||
#ifdef WIN32
|
||||
#include <winsock2.h>
|
||||
#include <ws2tcpip.h>
|
||||
#endif
|
||||
|
||||
#include "event2/util.h"
|
||||
@ -78,6 +79,7 @@ const struct bufferevent_ops bufferevent_ops_async = {
|
||||
|
||||
struct bufferevent_async {
|
||||
struct bufferevent_private bev;
|
||||
struct event_overlapped connect_overlapped;
|
||||
unsigned read_in_progress : 1;
|
||||
unsigned write_in_progress : 1;
|
||||
};
|
||||
@ -93,6 +95,15 @@ upcast(struct bufferevent *bev)
|
||||
return bev_a;
|
||||
}
|
||||
|
||||
static inline struct bufferevent_async *
|
||||
upcast_overlapped(struct event_overlapped *eo)
|
||||
{
|
||||
struct bufferevent_async *bev_a;
|
||||
bev_a = EVUTIL_UPCAST(eo, struct bufferevent_async, connect_overlapped);
|
||||
EVUTIL_ASSERT(bev_a->bev.bev.be_ops == &bufferevent_ops_async);
|
||||
return bev_a;
|
||||
}
|
||||
|
||||
static void
|
||||
bev_async_consider_writing(struct bufferevent_async *b)
|
||||
{
|
||||
@ -244,6 +255,24 @@ be_async_flush(struct bufferevent *bev, short what,
|
||||
return 0;
|
||||
}
|
||||
|
||||
static void
|
||||
connect_complete(struct event_overlapped *eo, uintptr_t key,
|
||||
ev_ssize_t nbytes, int ok)
|
||||
{
|
||||
struct bufferevent_async *bev_a = upcast_overlapped(eo);
|
||||
struct bufferevent *bev = &bev_a->bev.bev; /* XXX locking issue ? */
|
||||
|
||||
_bufferevent_incref_and_lock(bev);
|
||||
|
||||
EVUTIL_ASSERT(bev_a->bev.connecting);
|
||||
bev_a->bev.connecting = 0;
|
||||
|
||||
_bufferevent_run_eventcb(bev,
|
||||
ok? BEV_EVENT_CONNECTED : BEV_EVENT_ERROR);
|
||||
|
||||
_bufferevent_decref_and_unlock(bev);
|
||||
}
|
||||
|
||||
struct bufferevent *
|
||||
bufferevent_async_new(struct event_base *base,
|
||||
evutil_socket_t fd, int options)
|
||||
@ -257,8 +286,14 @@ bufferevent_async_new(struct event_base *base,
|
||||
if (!(iocp = event_base_get_iocp(base)))
|
||||
return NULL;
|
||||
|
||||
if (event_iocp_port_associate(iocp, fd, 1)<0)
|
||||
return NULL;
|
||||
if (fd >= 0 && event_iocp_port_associate(iocp, fd, 1)<0) {
|
||||
int err = GetLastError();
|
||||
/* We may have alrady associated this fd with a port.
|
||||
* Let's hope it's this port, and that the error code
|
||||
* for doing this neer changes. */
|
||||
if (err != ERROR_INVALID_PARAMETER)
|
||||
return NULL;
|
||||
}
|
||||
|
||||
if (!(bev_a = mm_calloc(1, sizeof(struct bufferevent_async))))
|
||||
return NULL;
|
||||
@ -283,14 +318,72 @@ bufferevent_async_new(struct event_base *base,
|
||||
evbuffer_defer_callbacks(bev->input, base);
|
||||
evbuffer_defer_callbacks(bev->output, base);
|
||||
|
||||
evbuffer_add_cb(bev->input, be_async_inbuf_callback, bev);
|
||||
_bufferevent_init_generic_timeout_cbs(&bev_a->bev.bev);
|
||||
|
||||
event_overlapped_init(&bev_a->connect_overlapped, connect_complete);
|
||||
|
||||
return bev;
|
||||
err:
|
||||
bufferevent_free(&bev_a->bev.bev);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
int
|
||||
bufferevent_async_can_connect(struct bufferevent *bev)
|
||||
{
|
||||
const struct win32_extension_fns *ext =
|
||||
event_get_win32_extension_fns();
|
||||
|
||||
if (BEV_IS_ASYNC(bev) &&
|
||||
event_base_get_iocp(bev->ev_base) &&
|
||||
ext && ext->ConnectEx)
|
||||
return 1;
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
int
|
||||
bufferevent_async_connect(struct bufferevent *bev, evutil_socket_t fd,
|
||||
const struct sockaddr *sa, int socklen)
|
||||
{
|
||||
BOOL rc;
|
||||
struct bufferevent_async *bev_async = upcast(bev);
|
||||
struct sockaddr_storage ss;
|
||||
const struct win32_extension_fns *ext =
|
||||
event_get_win32_extension_fns();
|
||||
|
||||
EVUTIL_ASSERT(ext && ext->ConnectEx && fd >= 0 && sa != NULL);
|
||||
|
||||
/* ConnectEx() requires that the socket be bound to an address
|
||||
* with bind() before using, otherwise it will fail. We attempt
|
||||
* to issue a bind() here, taking into account that the error
|
||||
* code is set to WSAEINVAL when the socket is already bound. */
|
||||
memset(&ss, 0, sizeof(ss));
|
||||
if (sa->sa_family == AF_INET) {
|
||||
struct sockaddr_in *sin = (struct sockaddr_in *)&ss;
|
||||
sin->sin_family = AF_INET;
|
||||
sin->sin_addr.s_addr = INADDR_ANY;
|
||||
} else if (sa->sa_family == AF_INET6) {
|
||||
struct sockaddr_in6 *sin6 = (struct sockaddr_in6 *)&ss;
|
||||
sin6->sin6_family = AF_INET6;
|
||||
sin6->sin6_addr = in6addr_any;
|
||||
} else {
|
||||
/* XXX: what to do? */
|
||||
return -1;
|
||||
}
|
||||
if (bind(fd, (struct sockaddr *)&ss, sizeof(ss)) < 0 &&
|
||||
WSAGetLastError() != WSAEINVAL)
|
||||
return -1;
|
||||
|
||||
rc = ext->ConnectEx(fd, sa, socklen, NULL, 0, NULL,
|
||||
&bev_async->connect_overlapped.overlapped);
|
||||
if (rc || WSAGetLastError() == ERROR_IO_PENDING)
|
||||
return 0;
|
||||
|
||||
return -1;
|
||||
}
|
||||
|
||||
static int
|
||||
be_async_ctrl(struct bufferevent *bev, enum bufferevent_ctrl_op op,
|
||||
union bufferevent_ctrl_data *data)
|
||||
@ -299,7 +392,19 @@ be_async_ctrl(struct bufferevent *bev, enum bufferevent_ctrl_op op,
|
||||
case BEV_CTRL_GET_FD:
|
||||
data->fd = _evbuffer_overlapped_get_fd(bev->input);
|
||||
return 0;
|
||||
case BEV_CTRL_SET_FD:
|
||||
case BEV_CTRL_SET_FD: {
|
||||
struct event_iocp_port *iocp;
|
||||
|
||||
if (data->fd == _evbuffer_overlapped_get_fd(bev->input))
|
||||
return 0;
|
||||
if (!(iocp = event_base_get_iocp(bev->ev_base)))
|
||||
return -1;
|
||||
if (event_iocp_port_associate(iocp, data->fd, 1) < 0)
|
||||
return -1;
|
||||
_evbuffer_overlapped_set_fd(bev->input, data->fd);
|
||||
_evbuffer_overlapped_set_fd(bev->output, data->fd);
|
||||
return 0;
|
||||
}
|
||||
case BEV_CTRL_GET_UNDERLYING:
|
||||
default:
|
||||
return -1;
|
||||
|
@ -215,7 +215,7 @@ bufferevent_writecb(evutil_socket_t fd, short event, void *arg)
|
||||
} else {
|
||||
connected = 1;
|
||||
_bufferevent_run_eventcb(bufev, BEV_EVENT_CONNECTED);
|
||||
if (!(bufev->enabled & EV_WRITE)) {
|
||||
if (!(bufev->enabled & EV_WRITE) || BEV_IS_ASYNC(bufev)) {
|
||||
event_del(&bufev->ev_write);
|
||||
goto done;
|
||||
}
|
||||
@ -332,18 +332,33 @@ bufferevent_socket_connect(struct bufferevent *bev,
|
||||
ownfd = 1;
|
||||
}
|
||||
if (sa) {
|
||||
r = evutil_socket_connect(&fd, sa, socklen);
|
||||
if (r < 0) {
|
||||
_bufferevent_run_eventcb(bev, BEV_EVENT_ERROR);
|
||||
if (ownfd)
|
||||
EVUTIL_CLOSESOCKET(fd);
|
||||
/* do something about the error? */
|
||||
#ifdef WIN32
|
||||
if (bufferevent_async_can_connect(bev)) {
|
||||
bufferevent_setfd(bev, fd);
|
||||
r = bufferevent_async_connect(bev, fd, sa, socklen);
|
||||
if (r < 0)
|
||||
goto freesock;
|
||||
bufev_p->connecting = 1;
|
||||
result = 0;
|
||||
goto done;
|
||||
}
|
||||
} else
|
||||
#endif
|
||||
r = evutil_socket_connect(&fd, sa, socklen);
|
||||
if (r < 0)
|
||||
goto freesock;
|
||||
}
|
||||
#ifdef WIN32
|
||||
/* ConnectEx() isn't always around, even when IOCP is enabled.
|
||||
* Here, we borrow the socket object's write handler to fall back
|
||||
* on a non-blocking connect() when ConnectEx() is unavailable. */
|
||||
if (BEV_IS_ASYNC(bev)) {
|
||||
event_assign(&bev->ev_write, bev->ev_base, fd,
|
||||
EV_WRITE|EV_PERSIST, bufferevent_writecb, bev);
|
||||
}
|
||||
#endif
|
||||
bufferevent_setfd(bev, fd);
|
||||
if (r == 0) {
|
||||
if (! bufferevent_enable(bev, EV_WRITE)) {
|
||||
if (! be_socket_enable(bev, EV_WRITE)) {
|
||||
bufev_p->connecting = 1;
|
||||
result = 0;
|
||||
goto done;
|
||||
@ -354,6 +369,14 @@ bufferevent_socket_connect(struct bufferevent *bev,
|
||||
_bufferevent_run_eventcb(bev, BEV_EVENT_CONNECTED);
|
||||
}
|
||||
|
||||
goto done;
|
||||
|
||||
freesock:
|
||||
_bufferevent_run_eventcb(bev, BEV_EVENT_ERROR);
|
||||
if (ownfd)
|
||||
EVUTIL_CLOSESOCKET(fd);
|
||||
/* do something about the error? */
|
||||
|
||||
done:
|
||||
_bufferevent_decref_and_unlock(bev);
|
||||
return result;
|
||||
|
@ -117,6 +117,8 @@ struct evbuffer *evbuffer_overlapped_new(evutil_socket_t fd);
|
||||
/** XXXX Document (nickm) */
|
||||
evutil_socket_t _evbuffer_overlapped_get_fd(struct evbuffer *buf);
|
||||
|
||||
void _evbuffer_overlapped_set_fd(struct evbuffer *buf, evutil_socket_t fd);
|
||||
|
||||
/** Start reading data onto the end of an overlapped evbuffer.
|
||||
|
||||
An evbuffer can only have one read pending at a time. While the read
|
||||
@ -176,6 +178,10 @@ int event_base_start_iocp(struct event_base *base);
|
||||
struct bufferevent *bufferevent_async_new(struct event_base *base,
|
||||
evutil_socket_t fd, int options);
|
||||
|
||||
/* FIXME document. */
|
||||
int bufferevent_async_can_connect(struct bufferevent *bev);
|
||||
int bufferevent_async_connect(struct bufferevent *bev, evutil_socket_t fd,
|
||||
const struct sockaddr *sa, int socklen);
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
|
@ -84,7 +84,8 @@ void run_legacy_test_fn(void *ptr);
|
||||
#define TT_LEGACY (TT_FIRST_USER_FLAG<<3)
|
||||
#define TT_NEED_THREADS (TT_FIRST_USER_FLAG<<4)
|
||||
#define TT_NO_LOGS (TT_FIRST_USER_FLAG<<5)
|
||||
#define TT_ENABLE_IOCP (TT_FIRST_USER_FLAG<<6)
|
||||
#define TT_ENABLE_IOCP_FLAG (TT_FIRST_USER_FLAG<<6)
|
||||
#define TT_ENABLE_IOCP (TT_ENABLE_IOCP_FLAG|TT_NEED_THREADS)
|
||||
|
||||
/* All the flags that a legacy test needs. */
|
||||
#define TT_ISOLATED TT_FORK|TT_NEED_SOCKETPAIR|TT_NEED_BASE
|
||||
|
@ -73,6 +73,9 @@
|
||||
#include "event2/util.h"
|
||||
|
||||
#include "bufferevent-internal.h"
|
||||
#ifdef WIN32
|
||||
#include "iocp-internal.h"
|
||||
#endif
|
||||
|
||||
#include "regress.h"
|
||||
|
||||
@ -412,10 +415,14 @@ listen_cb(struct evconnlistener *listener, evutil_socket_t fd,
|
||||
struct event_base *base = arg;
|
||||
struct bufferevent *bev;
|
||||
const char s[] = TEST_STR;
|
||||
TT_BLATHER(("Got a request on socket %d", (int)fd ));
|
||||
bev = bufferevent_socket_new(base, fd, BEV_OPT_CLOSE_ON_FREE);
|
||||
tt_assert(bev);
|
||||
bufferevent_write(bev, s, sizeof(s));
|
||||
bufferevent_setcb(bev, NULL, sender_writecb, sender_errorcb, NULL);
|
||||
bufferevent_enable(bev, EV_WRITE);
|
||||
end:
|
||||
;
|
||||
}
|
||||
|
||||
static void
|
||||
@ -459,6 +466,14 @@ test_bufferevent_connect(void *arg)
|
||||
if (strstr((char*)data->setup_data, "lock")) {
|
||||
be_flags |= BEV_OPT_THREADSAFE;
|
||||
}
|
||||
#ifdef WIN32
|
||||
if (!strcmp((char*)data->setup_data, "unset_connectex")) {
|
||||
struct win32_extension_fns *ext =
|
||||
(struct win32_extension_fns *)
|
||||
event_get_win32_extension_fns();
|
||||
ext->ConnectEx = NULL;
|
||||
}
|
||||
#endif
|
||||
|
||||
memset(&localhost, 0, sizeof(localhost));
|
||||
|
||||
@ -616,17 +631,23 @@ struct testcase_t bufferevent_iocp_testcases[] = {
|
||||
LEGACY(bufferevent_watermarks, TT_ISOLATED|TT_ENABLE_IOCP),
|
||||
LEGACY(bufferevent_filters, TT_ISOLATED|TT_ENABLE_IOCP),
|
||||
#if 0
|
||||
{ "bufferevent_connect", test_bufferevent_connect, TT_FORK|TT_NEED_BASE,
|
||||
&basic_setup, (void*)"" },
|
||||
{ "bufferevent_connect", test_bufferevent_connect,
|
||||
TT_FORK|TT_NEED_BASE|TT_ENABLE_IOCP, &basic_setup, (void*)"" },
|
||||
{ "bufferevent_connect_defer", test_bufferevent_connect,
|
||||
TT_FORK|TT_NEED_BASE, &basic_setup, (void*)"defer" },
|
||||
TT_FORK|TT_NEED_BASE|TT_ENABLE_IOCP, &basic_setup, (void*)"defer" },
|
||||
{ "bufferevent_connect_lock", test_bufferevent_connect,
|
||||
TT_FORK|TT_NEED_BASE|TT_NEED_THREADS, &basic_setup, (void*)"lock" },
|
||||
TT_FORK|TT_NEED_BASE|TT_NEED_THREADS|TT_ENABLE_IOCP, &basic_setup,
|
||||
(void*)"lock" },
|
||||
{ "bufferevent_connect_lock_defer", test_bufferevent_connect,
|
||||
TT_FORK|TT_NEED_BASE|TT_NEED_THREADS, &basic_setup,
|
||||
TT_FORK|TT_NEED_BASE|TT_NEED_THREADS|TT_ENABLE_IOCP, &basic_setup,
|
||||
(void*)"defer lock" },
|
||||
#endif
|
||||
{ "bufferevent_connect_fail", test_bufferevent_connect_fail,
|
||||
TT_FORK|TT_NEED_BASE, &basic_setup, NULL },
|
||||
TT_FORK|TT_NEED_BASE|TT_ENABLE_IOCP, &basic_setup, NULL },
|
||||
#if 0
|
||||
{ "bufferevent_connect_nonblocking", test_bufferevent_connect,
|
||||
TT_FORK|TT_NEED_BASE|TT_ENABLE_IOCP, &basic_setup,
|
||||
(void*)"unset_connectex" },
|
||||
#endif
|
||||
|
||||
END_OF_TESTCASES,
|
||||
|
@ -38,6 +38,11 @@
|
||||
#include "tinytest.h"
|
||||
#include "tinytest_macros.h"
|
||||
|
||||
#define WIN32_LEAN_AND_MEAN
|
||||
#include <windows.h>
|
||||
#include <winsock2.h>
|
||||
#undef WIN32_LEAN_AND_MEAN
|
||||
|
||||
#include "iocp-internal.h"
|
||||
#include "evthread-internal.h"
|
||||
|
||||
|
@ -135,7 +135,7 @@ basic_test_setup(const struct testcase_t *testcase)
|
||||
struct basic_test_data *data = NULL;
|
||||
|
||||
#ifndef WIN32
|
||||
if (testcase->flags & TT_ENABLE_IOCP)
|
||||
if (testcase->flags & TT_ENABLE_IOCP_FLAG)
|
||||
return (void*)TT_SKIP;
|
||||
#endif
|
||||
|
||||
@ -177,7 +177,7 @@ basic_test_setup(const struct testcase_t *testcase)
|
||||
if (!base)
|
||||
exit(1);
|
||||
}
|
||||
if (testcase->flags & TT_ENABLE_IOCP) {
|
||||
if (testcase->flags & TT_ENABLE_IOCP_FLAG) {
|
||||
if (event_base_start_iocp(base)<0) {
|
||||
event_base_free(base);
|
||||
return (void*)TT_SKIP;
|
||||
|
Loading…
x
Reference in New Issue
Block a user