convert evhttp_connection to use bufferevents

svn:r742
This commit is contained in:
Niels Provos 2008-04-29 04:52:50 +00:00
parent 0ec09b5507
commit e44ef375ee
3 changed files with 179 additions and 162 deletions

View File

@ -76,6 +76,7 @@ Changes in current version:
o fix a bug in which bufferevent_write_buffer would not schedule a write event
o provide bufferevent_input and bufferevent_output without requiring knowledge of the structure
o introduce bufferevent_setcb and bufferevent_setfd to allow better manipulation of bufferevents
o convert evhttp_connection to use bufferevents.
Changes in 1.4.0:
o allow \r or \n individually to separate HTTP headers instead of the standard "\r\n"; from Charles Kerr.

View File

@ -20,7 +20,8 @@
enum evhttp_connection_error {
EVCON_HTTP_TIMEOUT,
EVCON_HTTP_EOF,
EVCON_HTTP_INVALID_HEADER
EVCON_HTTP_INVALID_HEADER,
EVCON_HTTP_BUFFER_ERROR
};
struct evbuffer;
@ -42,10 +43,10 @@ struct evhttp_connection {
TAILQ_ENTRY(evhttp_connection) (next);
evutil_socket_t fd;
struct event ev;
struct bufferevent *bufev;
struct event retry_ev; /* for retrying connects */
struct event close_ev;
struct evbuffer *input_buffer;
struct evbuffer *output_buffer;
char *bind_address; /* address to use for binding the src */
@ -56,6 +57,7 @@ struct evhttp_connection {
#define EVHTTP_CON_INCOMING 0x0001 /* only one request on it ever */
#define EVHTTP_CON_OUTGOING 0x0002 /* multiple requests possible */
#define EVHTTP_CON_CLOSEDETECT 0x0004 /* detecting if persistent close */
#define EVHTTP_CON_GOTHEADERS 0x0008 /* done reading headers */
int timeout; /* timeout in seconds for events */
int retry_cnt; /* retry count */
@ -68,7 +70,7 @@ struct evhttp_connection {
TAILQ_HEAD(evcon_requestq, evhttp_request) requests;
void (*cb)(struct evhttp_connection *, void *);
void (*cb)(struct evhttp_connection *, void *);
void *cb_arg;
void (*closecb)(struct evhttp_connection *, void *);

324
http.c
View File

@ -164,8 +164,11 @@ static void evhttp_connection_stop_detectclose(
struct evhttp_connection *evcon);
static void evhttp_request_dispatch(struct evhttp_connection* evcon);
void evhttp_read(evutil_socket_t, short, void *);
void evhttp_write(evutil_socket_t, short, void *);
/* callbacks for bufferevent */
static void evhttp_read_cb(struct bufferevent *, void *);
static void evhttp_read_header_cb(struct bufferevent *bufev, void *arg);
static void evhttp_write_cb(struct bufferevent *, void *);
static void evhttp_error_cb(struct bufferevent *bufev, short what, void *arg);
#ifndef HAVE_STRSEP
/* strsep replacement for platforms that lack it. Only works if
@ -298,13 +301,8 @@ evhttp_write_buffer(struct evhttp_connection *evcon,
evcon->cb = cb;
evcon->cb_arg = arg;
/* check if the event is already pending */
if (event_pending(&evcon->ev, EV_WRITE|EV_TIMEOUT, NULL))
event_del(&evcon->ev);
event_set(&evcon->ev, evcon->fd, EV_WRITE, evhttp_write, evcon);
EVHTTP_BASE_SET(evcon, &evcon->ev);
evhttp_add_event(&evcon->ev, evcon->timeout, HTTP_WRITE_TIMEOUT);
bufferevent_disable(evcon->bufev, EV_READ);
bufferevent_enable(evcon->bufev, EV_WRITE);
}
/*
@ -321,7 +319,8 @@ evhttp_make_header_request(struct evhttp_connection *evcon,
/* Generate request line */
method = evhttp_method(req->type);
evbuffer_add_printf(evcon->output_buffer, "%s %s HTTP/%d.%d\r\n",
evbuffer_add_printf(bufferevent_output(evcon->bufev),
"%s %s HTTP/%d.%d\r\n",
method, req->uri, req->major, req->minor);
/* Add the content length on a post or put request if missing */
@ -398,7 +397,8 @@ static void
evhttp_make_header_response(struct evhttp_connection *evcon,
struct evhttp_request *req)
{
evbuffer_add_printf(evcon->output_buffer, "HTTP/%d.%d %d %s\r\n",
evbuffer_add_printf(bufferevent_output(evcon->bufev),
"HTTP/%d.%d %d %s\r\n",
req->major, req->minor, req->response_code,
req->response_code_line);
@ -439,6 +439,7 @@ void
evhttp_make_header(struct evhttp_connection *evcon, struct evhttp_request *req)
{
struct evkeyval *header;
struct evbuffer *output = bufferevent_output(evcon->bufev);
/*
* Depending if this is a HTTP request or response, we might need to
@ -451,17 +452,17 @@ evhttp_make_header(struct evhttp_connection *evcon, struct evhttp_request *req)
}
TAILQ_FOREACH(header, req->output_headers, next) {
evbuffer_add_printf(evcon->output_buffer, "%s: %s\r\n",
evbuffer_add_printf(output, "%s: %s\r\n",
header->key, header->value);
}
evbuffer_add(evcon->output_buffer, "\r\n", 2);
evbuffer_add(output, "\r\n", 2);
if (EVBUFFER_LENGTH(req->output_buffer) > 0) {
/*
* For a request, we add the POST data, for a reply, this
* is the regular data.
*/
evbuffer_add_buffer(evcon->output_buffer, req->output_buffer);
evbuffer_add_buffer(output, req->output_buffer);
}
}
@ -537,6 +538,7 @@ evhttp_connection_incoming_fail(struct evhttp_request *req,
*/
return (-1);
case EVCON_HTTP_INVALID_HEADER:
case EVCON_HTTP_BUFFER_ERROR:
default: /* xxx: probably should just error on default */
/* the callback looks at the uri to determine errors */
if (req->uri) {
@ -563,6 +565,8 @@ evhttp_connection_fail(struct evhttp_connection *evcon,
void *cb_arg;
assert(req != NULL);
bufferevent_disable(evcon->bufev, EV_READ|EV_WRITE);
if (evcon->flags & EVHTTP_CON_INCOMING) {
/*
* for incoming requests, there are two different
@ -598,35 +602,10 @@ evhttp_connection_fail(struct evhttp_connection *evcon,
(*cb)(NULL, cb_arg);
}
void
evhttp_write(evutil_socket_t fd, short what, void *arg)
static void
evhttp_write_cb(struct bufferevent *bufev, void *arg)
{
struct evhttp_connection *evcon = arg;
int n;
if (what == EV_TIMEOUT) {
evhttp_connection_fail(evcon, EVCON_HTTP_TIMEOUT);
return;
}
n = evbuffer_write(evcon->output_buffer, fd);
if (n == -1) {
event_debug(("%s: evbuffer_write", __func__));
evhttp_connection_fail(evcon, EVCON_HTTP_EOF);
return;
}
if (n == 0) {
event_debug(("%s: write nothing", __func__));
evhttp_connection_fail(evcon, EVCON_HTTP_EOF);
return;
}
if (EVBUFFER_LENGTH(evcon->output_buffer) != 0) {
evhttp_add_event(&evcon->ev,
evcon->timeout, HTTP_WRITE_TIMEOUT);
return;
}
/* Activate our call back */
if (evcon->cb != NULL)
@ -745,12 +724,13 @@ evhttp_handle_chunked_read(struct evhttp_request *req, struct evbuffer *buf)
static void
evhttp_read_body(struct evhttp_connection *evcon, struct evhttp_request *req)
{
struct evbuffer *buf = evcon->input_buffer;
struct evbuffer *buf = bufferevent_input(evcon->bufev);
if (req->chunked) {
int res = evhttp_handle_chunked_read(req, buf);
if (res == 1) {
/* finished last chunk */
bufferevent_disable(evcon->bufev, EV_READ);
evhttp_connection_done(evcon);
return;
} else if (res == -1) {
@ -763,50 +743,33 @@ evhttp_read_body(struct evhttp_connection *evcon, struct evhttp_request *req)
/* Read until connection close. */
evbuffer_add_buffer(req->input_buffer, buf);
} else if (EVBUFFER_LENGTH(buf) >= req->ntoread) {
bufferevent_disable(evcon->bufev, EV_READ);
/* Completed content length */
evbuffer_remove_buffer(buf, req->input_buffer, req->ntoread);
req->ntoread = 0;
evhttp_connection_done(evcon);
return;
}
/* Read more! */
event_set(&evcon->ev, evcon->fd, EV_READ, evhttp_read, evcon);
EVHTTP_BASE_SET(evcon, &evcon->ev);
evhttp_add_event(&evcon->ev, evcon->timeout, HTTP_READ_TIMEOUT);
bufferevent_setcb(evcon->bufev,
evhttp_read_cb,
evhttp_write_cb,
evhttp_error_cb,
evcon);
bufferevent_enable(evcon->bufev, EV_READ);
}
/*
* Reads data into a buffer structure until no more data
* can be read on the file descriptor or we have read all
* the data that we wanted to read.
* Execute callback when done.
* Gets called when more data becomes available
*/
void
evhttp_read(evutil_socket_t fd, short what, void *arg)
static void
evhttp_read_cb(struct bufferevent *bufev, void *arg)
{
struct evhttp_connection *evcon = arg;
struct evhttp_request *req = TAILQ_FIRST(&evcon->requests);
struct evbuffer *buf = evcon->input_buffer;
int n, len;
if (what == EV_TIMEOUT) {
evhttp_connection_fail(evcon, EVCON_HTTP_TIMEOUT);
return;
}
n = evbuffer_read(buf, fd, -1);
len = EVBUFFER_LENGTH(buf);
event_debug(("%s: got %d on %d\n", __func__, n, fd));
if (n == -1) {
event_debug(("%s: evbuffer_read", __func__));
evhttp_connection_fail(evcon, EVCON_HTTP_EOF);
return;
} else if (n == 0) {
/* Connection closed */
evhttp_connection_done(evcon);
return;
}
evhttp_read_body(evcon, req);
}
@ -852,8 +815,11 @@ evhttp_connection_free(struct evhttp_connection *evcon)
if (event_initialized(&evcon->close_ev))
event_del(&evcon->close_ev);
if (event_initialized(&evcon->ev))
event_del(&evcon->ev);
if (event_initialized(&evcon->retry_ev))
event_del(&evcon->retry_ev);
if (evcon->bufev != NULL)
bufferevent_free(evcon->bufev);
if (evcon->fd != -1)
EVUTIL_CLOSESOCKET(evcon->fd);
@ -864,12 +830,6 @@ evhttp_connection_free(struct evhttp_connection *evcon)
if (evcon->address != NULL)
mm_free(evcon->address);
if (evcon->input_buffer != NULL)
evbuffer_free(evcon->input_buffer);
if (evcon->output_buffer != NULL)
evbuffer_free(evcon->output_buffer);
mm_free(evcon);
}
@ -910,8 +870,7 @@ evhttp_request_dispatch(struct evhttp_connection* evcon)
void
evhttp_connection_reset(struct evhttp_connection *evcon)
{
if (event_initialized(&evcon->ev))
event_del(&evcon->ev);
bufferevent_disable(evcon->bufev, EV_READ|EV_WRITE);
if (evcon->fd != -1) {
/* inform interested parties about connection close */
@ -924,7 +883,7 @@ evhttp_connection_reset(struct evhttp_connection *evcon)
evcon->state = EVCON_DISCONNECTED;
/* remove unneeded flags */
evcon->flags &= ~EVHTTP_CON_CLOSEDETECT;
evcon->flags &= ~(EVHTTP_CON_CLOSEDETECT|EVHTTP_CON_GOTHEADERS);
}
static void
@ -964,23 +923,82 @@ evhttp_connection_retry(evutil_socket_t fd, short what, void *arg)
evhttp_connection_connect(evcon);
}
static void
evhttp_connection_cb_cleanup(struct evhttp_connection *evcon)
{
if (evcon->retry_max < 0 || evcon->retry_cnt < evcon->retry_max) {
evtimer_set(&evcon->retry_ev, evhttp_connection_retry, evcon);
EVHTTP_BASE_SET(evcon, &evcon->retry_ev);
evhttp_add_event(&evcon->retry_ev,
MIN(3600, 2 << evcon->retry_cnt),
HTTP_CONNECT_TIMEOUT);
evcon->retry_cnt++;
return;
}
evhttp_connection_reset(evcon);
/* for now, we just signal all requests by executing their callbacks */
while (TAILQ_FIRST(&evcon->requests) != NULL) {
struct evhttp_request *request = TAILQ_FIRST(&evcon->requests);
TAILQ_REMOVE(&evcon->requests, request, next);
request->evcon = NULL;
/* we might want to set an error here */
request->cb(request, request->cb_arg);
evhttp_request_free(request);
}
}
static void
evhttp_error_cb(struct bufferevent *bufev, short what, void *arg)
{
struct evhttp_connection *evcon = arg;
switch (evcon->state) {
case EVCON_CONNECTING:
if (what == EVBUFFER_TIMEOUT) {
event_debug(("%s: connection timeout for \"%s:%d\" on %d",
__func__, evcon->address, evcon->port,
evcon->fd));
evhttp_connection_cb_cleanup(evcon);
return;
}
break;
case EVCON_CONNECTED:
if (what == (EVBUFFER_READ|EVBUFFER_EOF) &&
(evcon->flags & EVHTTP_CON_GOTHEADERS)) {
/* EOF on read can be benign */
evhttp_connection_done(evcon);
return;
}
break;
case EVCON_DISCONNECTED:
default:
break;
}
if (what & EVBUFFER_TIMEOUT) {
evhttp_connection_fail(evcon, EVCON_HTTP_TIMEOUT);
} else if (what & EVBUFFER_EOF) {
evhttp_connection_fail(evcon, EVCON_HTTP_EOF);
} else {
evhttp_connection_fail(evcon, EVCON_HTTP_BUFFER_ERROR);
}
}
/*
* Call back for asynchronous connection attempt.
*/
static void
evhttp_connectioncb(evutil_socket_t fd, short what, void *arg)
evhttp_connection_cb(struct bufferevent *bufev, void *arg)
{
struct evhttp_connection *evcon = arg;
int error;
socklen_t errsz = sizeof(error);
if (what == EV_TIMEOUT) {
event_debug(("%s: connection timeout for \"%s:%d\" on %d",
__func__, evcon->address, evcon->port, evcon->fd));
goto cleanup;
}
/* Check if the connection completed */
if (getsockopt(evcon->fd, SOL_SOCKET, SO_ERROR, (void*)&error,
&errsz) == -1) {
@ -1003,32 +1021,28 @@ evhttp_connectioncb(evutil_socket_t fd, short what, void *arg)
/* Reset the retry count as we were successful in connecting */
evcon->retry_cnt = 0;
evcon->state = EVCON_CONNECTED;
evcon->flags &= ~EVHTTP_CON_GOTHEADERS;
/* reset the bufferevent cbs */
bufferevent_setcb(evcon->bufev,
evhttp_read_header_cb,
evhttp_write_cb,
evhttp_error_cb,
evcon);
if (evcon->timeout == -1)
bufferevent_settimeout(evcon->bufev,
HTTP_READ_TIMEOUT, HTTP_WRITE_TIMEOUT);
else
bufferevent_settimeout(evcon->bufev,
evcon->timeout, evcon->timeout);
/* try to start requests that have queued up on this connection */
evhttp_request_dispatch(evcon);
return;
cleanup:
if (evcon->retry_max < 0 || evcon->retry_cnt < evcon->retry_max) {
evtimer_set(&evcon->ev, evhttp_connection_retry, evcon);
EVHTTP_BASE_SET(evcon, &evcon->ev);
evhttp_add_event(&evcon->ev, MIN(3600, 2 << evcon->retry_cnt),
HTTP_CONNECT_TIMEOUT);
evcon->retry_cnt++;
return;
}
evhttp_connection_reset(evcon);
/* for now, we just signal all requests by executing their callbacks */
while (TAILQ_FIRST(&evcon->requests) != NULL) {
struct evhttp_request *request = TAILQ_FIRST(&evcon->requests);
TAILQ_REMOVE(&evcon->requests, request, next);
request->evcon = NULL;
/* we might want to set an error here */
request->cb(request, request->cb_arg);
evhttp_request_free(request);
}
evhttp_connection_cb_cleanup(evcon);
}
/*
@ -1366,32 +1380,15 @@ evhttp_get_body(struct evhttp_connection *evcon, struct evhttp_request *req)
evhttp_read_body(evcon, req);
}
void
evhttp_read_header(evutil_socket_t fd, short what, void *arg)
static void
evhttp_read_header_cb(struct bufferevent *bufev, void *arg)
{
struct evhttp_connection *evcon = arg;
struct evhttp_request *req = TAILQ_FIRST(&evcon->requests);
int n, res;
int res;
int fd = evcon->fd;
if (what == EV_TIMEOUT) {
event_debug(("%s: timeout on %d\n", __func__, fd));
evhttp_connection_fail(evcon, EVCON_HTTP_TIMEOUT);
return;
}
n = evbuffer_read(evcon->input_buffer, fd, -1);
if (n == 0) {
event_debug(("%s: no more data on %d", __func__, fd));
evhttp_connection_fail(evcon, EVCON_HTTP_EOF);
return;
}
if (n == -1) {
event_debug(("%s: bad read on %d", __func__, fd));
evhttp_connection_fail(evcon, EVCON_HTTP_EOF);
return;
}
res = evhttp_parse_lines(req, evcon->input_buffer);
res = evhttp_parse_lines(req, bufferevent_input(evcon->bufev));
if (res == -1) {
/* Error while reading, terminate */
event_debug(("%s: bad header lines on %d\n", __func__, fd));
@ -1399,11 +1396,15 @@ evhttp_read_header(evutil_socket_t fd, short what, void *arg)
return;
} else if (res == 0) {
/* Need more header lines */
evhttp_add_event(&evcon->ev,
evcon->timeout, HTTP_READ_TIMEOUT);
return;
}
/* Disable reading for now */
bufferevent_disable(evcon->bufev, EV_READ);
/* we got all headers */
evcon->flags |= EVHTTP_CON_GOTHEADERS;
/* Done reading headers, do the real work */
switch (req->kind) {
case EVHTTP_REQUEST:
@ -1466,13 +1467,11 @@ evhttp_connection_new(const char *address, unsigned short port)
goto error;
}
if ((evcon->input_buffer = evbuffer_new()) == NULL) {
event_warn("%s: evbuffer_new failed", __func__);
goto error;
}
if ((evcon->output_buffer = evbuffer_new()) == NULL) {
event_warn("%s: evbuffer_new failed", __func__);
if ((evcon->bufev = bufferevent_new(-1,
evhttp_read_header_cb,
evhttp_write_cb,
evhttp_error_cb, evcon)) == NULL) {
event_warn("%s: bufferevent_new failed", __func__);
goto error;
}
@ -1493,6 +1492,7 @@ void evhttp_connection_set_base(struct evhttp_connection *evcon,
assert(evcon->base == NULL);
assert(evcon->state == EVCON_DISCONNECTED);
evcon->base = base;
bufferevent_base_set(base, evcon->bufev);
}
void
@ -1500,6 +1500,13 @@ evhttp_connection_set_timeout(struct evhttp_connection *evcon,
int timeout_in_secs)
{
evcon->timeout = timeout_in_secs;
if (evcon->timeout == -1)
bufferevent_settimeout(evcon->bufev,
HTTP_READ_TIMEOUT, HTTP_WRITE_TIMEOUT);
else
bufferevent_settimeout(evcon->bufev,
evcon->timeout, evcon->timeout);
}
void
@ -1549,9 +1556,15 @@ evhttp_connection_connect(struct evhttp_connection *evcon)
}
/* Set up a callback for successful connection setup */
event_set(&evcon->ev, evcon->fd, EV_WRITE, evhttp_connectioncb, evcon);
EVHTTP_BASE_SET(evcon, &evcon->ev);
evhttp_add_event(&evcon->ev, evcon->timeout, HTTP_CONNECT_TIMEOUT);
bufferevent_setfd(evcon->bufev, evcon->fd);
bufferevent_setcb(evcon->bufev,
NULL /* evhttp_read_cb */,
evhttp_connection_cb,
evhttp_error_cb, evcon);
bufferevent_settimeout(evcon->bufev, 0,
evcon->timeout != -1 ? evcon->timeout : HTTP_CONNECT_TIMEOUT);
/* make sure that we get a write callback */
bufferevent_enable(evcon->bufev, EV_WRITE);
evcon->state = EVCON_CONNECTING;
@ -1613,12 +1626,8 @@ void
evhttp_start_read(struct evhttp_connection *evcon)
{
/* Set up an event to read the headers */
if (event_initialized(&evcon->ev))
event_del(&evcon->ev);
event_set(&evcon->ev, evcon->fd, EV_READ, evhttp_read_header, evcon);
EVHTTP_BASE_SET(evcon, &evcon->ev);
evhttp_add_event(&evcon->ev, evcon->timeout, HTTP_READ_TIMEOUT);
bufferevent_disable(evcon->bufev, EV_WRITE);
bufferevent_enable(evcon->bufev, EV_READ);
}
static void
@ -1729,13 +1738,14 @@ evhttp_send_reply_start(struct evhttp_request *req, int code,
void
evhttp_send_reply_chunk(struct evhttp_request *req, struct evbuffer *databuf)
{
struct evbuffer *output = bufferevent_output(req->evcon->bufev);
if (req->chunked) {
evbuffer_add_printf(req->evcon->output_buffer, "%x\r\n",
evbuffer_add_printf(output, "%x\r\n",
(unsigned)EVBUFFER_LENGTH(databuf));
}
evbuffer_add_buffer(req->evcon->output_buffer, databuf);
evbuffer_add_buffer(output, databuf);
if (req->chunked) {
evbuffer_add(req->evcon->output_buffer, "\r\n", 2);
evbuffer_add(output, "\r\n", 2);
}
evhttp_write_buffer(req->evcon, NULL, NULL);
}
@ -1744,12 +1754,13 @@ void
evhttp_send_reply_end(struct evhttp_request *req)
{
struct evhttp_connection *evcon = req->evcon;
struct evbuffer *output = bufferevent_output(evcon->bufev);
if (req->chunked) {
evbuffer_add(req->evcon->output_buffer, "0\r\n\r\n", 5);
evbuffer_add(output, "0\r\n\r\n", 5);
evhttp_write_buffer(req->evcon, evhttp_send_done, NULL);
req->chunked = 0;
} else if (!event_pending(&evcon->ev, EV_WRITE|EV_TIMEOUT, NULL)) {
} else if (EVBUFFER_LENGTH(output) == 0) {
/* let the connection know that we are done with the request */
evhttp_send_done(evcon, NULL);
} else {
@ -2291,14 +2302,17 @@ evhttp_get_request_connection(
if (evcon == NULL)
return (NULL);
/* associate the base if we have one*/
evhttp_connection_set_base(evcon, http->base);
/* associate the base if we have one */
if (http->base != NULL)
evhttp_connection_set_base(evcon, http->base);
evcon->flags |= EVHTTP_CON_INCOMING;
evcon->state = EVCON_CONNECTED;
evcon->fd = fd;
bufferevent_setfd(evcon->bufev, fd);
return (evcon);
}