From e44ef375ee9a8b4c7e8d21a752e9e4864595758f Mon Sep 17 00:00:00 2001 From: Niels Provos Date: Tue, 29 Apr 2008 04:52:50 +0000 Subject: [PATCH] convert evhttp_connection to use bufferevents svn:r742 --- ChangeLog | 1 + http-internal.h | 12 +- http.c | 328 +++++++++++++++++++++++++----------------------- 3 files changed, 179 insertions(+), 162 deletions(-) diff --git a/ChangeLog b/ChangeLog index 0b56306a..db680364 100644 --- a/ChangeLog +++ b/ChangeLog @@ -76,6 +76,7 @@ Changes in current version: o fix a bug in which bufferevent_write_buffer would not schedule a write event o provide bufferevent_input and bufferevent_output without requiring knowledge of the structure o introduce bufferevent_setcb and bufferevent_setfd to allow better manipulation of bufferevents + o convert evhttp_connection to use bufferevents. Changes in 1.4.0: o allow \r or \n individually to separate HTTP headers instead of the standard "\r\n"; from Charles Kerr. diff --git a/http-internal.h b/http-internal.h index f1708655..48860f0f 100644 --- a/http-internal.h +++ b/http-internal.h @@ -20,7 +20,8 @@ enum evhttp_connection_error { EVCON_HTTP_TIMEOUT, EVCON_HTTP_EOF, - EVCON_HTTP_INVALID_HEADER + EVCON_HTTP_INVALID_HEADER, + EVCON_HTTP_BUFFER_ERROR }; struct evbuffer; @@ -42,10 +43,10 @@ struct evhttp_connection { TAILQ_ENTRY(evhttp_connection) (next); evutil_socket_t fd; - struct event ev; + struct bufferevent *bufev; + + struct event retry_ev; /* for retrying connects */ struct event close_ev; - struct evbuffer *input_buffer; - struct evbuffer *output_buffer; char *bind_address; /* address to use for binding the src */ @@ -56,6 +57,7 @@ struct evhttp_connection { #define EVHTTP_CON_INCOMING 0x0001 /* only one request on it ever */ #define EVHTTP_CON_OUTGOING 0x0002 /* multiple requests possible */ #define EVHTTP_CON_CLOSEDETECT 0x0004 /* detecting if persistent close */ +#define EVHTTP_CON_GOTHEADERS 0x0008 /* done reading headers */ int timeout; /* timeout in seconds for events */ int retry_cnt; /* retry count */ @@ -68,7 +70,7 @@ struct evhttp_connection { TAILQ_HEAD(evcon_requestq, evhttp_request) requests; - void (*cb)(struct evhttp_connection *, void *); + void (*cb)(struct evhttp_connection *, void *); void *cb_arg; void (*closecb)(struct evhttp_connection *, void *); diff --git a/http.c b/http.c index 1695d610..dd2790c4 100644 --- a/http.c +++ b/http.c @@ -164,8 +164,11 @@ static void evhttp_connection_stop_detectclose( struct evhttp_connection *evcon); static void evhttp_request_dispatch(struct evhttp_connection* evcon); -void evhttp_read(evutil_socket_t, short, void *); -void evhttp_write(evutil_socket_t, short, void *); +/* callbacks for bufferevent */ +static void evhttp_read_cb(struct bufferevent *, void *); +static void evhttp_read_header_cb(struct bufferevent *bufev, void *arg); +static void evhttp_write_cb(struct bufferevent *, void *); +static void evhttp_error_cb(struct bufferevent *bufev, short what, void *arg); #ifndef HAVE_STRSEP /* strsep replacement for platforms that lack it. Only works if @@ -298,13 +301,8 @@ evhttp_write_buffer(struct evhttp_connection *evcon, evcon->cb = cb; evcon->cb_arg = arg; - /* check if the event is already pending */ - if (event_pending(&evcon->ev, EV_WRITE|EV_TIMEOUT, NULL)) - event_del(&evcon->ev); - - event_set(&evcon->ev, evcon->fd, EV_WRITE, evhttp_write, evcon); - EVHTTP_BASE_SET(evcon, &evcon->ev); - evhttp_add_event(&evcon->ev, evcon->timeout, HTTP_WRITE_TIMEOUT); + bufferevent_disable(evcon->bufev, EV_READ); + bufferevent_enable(evcon->bufev, EV_WRITE); } /* @@ -321,7 +319,8 @@ evhttp_make_header_request(struct evhttp_connection *evcon, /* Generate request line */ method = evhttp_method(req->type); - evbuffer_add_printf(evcon->output_buffer, "%s %s HTTP/%d.%d\r\n", + evbuffer_add_printf(bufferevent_output(evcon->bufev), + "%s %s HTTP/%d.%d\r\n", method, req->uri, req->major, req->minor); /* Add the content length on a post or put request if missing */ @@ -398,7 +397,8 @@ static void evhttp_make_header_response(struct evhttp_connection *evcon, struct evhttp_request *req) { - evbuffer_add_printf(evcon->output_buffer, "HTTP/%d.%d %d %s\r\n", + evbuffer_add_printf(bufferevent_output(evcon->bufev), + "HTTP/%d.%d %d %s\r\n", req->major, req->minor, req->response_code, req->response_code_line); @@ -439,7 +439,8 @@ void evhttp_make_header(struct evhttp_connection *evcon, struct evhttp_request *req) { struct evkeyval *header; - + struct evbuffer *output = bufferevent_output(evcon->bufev); + /* * Depending if this is a HTTP request or response, we might need to * add some new headers or remove existing headers. @@ -451,17 +452,17 @@ evhttp_make_header(struct evhttp_connection *evcon, struct evhttp_request *req) } TAILQ_FOREACH(header, req->output_headers, next) { - evbuffer_add_printf(evcon->output_buffer, "%s: %s\r\n", + evbuffer_add_printf(output, "%s: %s\r\n", header->key, header->value); } - evbuffer_add(evcon->output_buffer, "\r\n", 2); + evbuffer_add(output, "\r\n", 2); if (EVBUFFER_LENGTH(req->output_buffer) > 0) { /* * For a request, we add the POST data, for a reply, this * is the regular data. */ - evbuffer_add_buffer(evcon->output_buffer, req->output_buffer); + evbuffer_add_buffer(output, req->output_buffer); } } @@ -537,6 +538,7 @@ evhttp_connection_incoming_fail(struct evhttp_request *req, */ return (-1); case EVCON_HTTP_INVALID_HEADER: + case EVCON_HTTP_BUFFER_ERROR: default: /* xxx: probably should just error on default */ /* the callback looks at the uri to determine errors */ if (req->uri) { @@ -563,6 +565,8 @@ evhttp_connection_fail(struct evhttp_connection *evcon, void *cb_arg; assert(req != NULL); + bufferevent_disable(evcon->bufev, EV_READ|EV_WRITE); + if (evcon->flags & EVHTTP_CON_INCOMING) { /* * for incoming requests, there are two different @@ -598,35 +602,10 @@ evhttp_connection_fail(struct evhttp_connection *evcon, (*cb)(NULL, cb_arg); } -void -evhttp_write(evutil_socket_t fd, short what, void *arg) +static void +evhttp_write_cb(struct bufferevent *bufev, void *arg) { struct evhttp_connection *evcon = arg; - int n; - - if (what == EV_TIMEOUT) { - evhttp_connection_fail(evcon, EVCON_HTTP_TIMEOUT); - return; - } - - n = evbuffer_write(evcon->output_buffer, fd); - if (n == -1) { - event_debug(("%s: evbuffer_write", __func__)); - evhttp_connection_fail(evcon, EVCON_HTTP_EOF); - return; - } - - if (n == 0) { - event_debug(("%s: write nothing", __func__)); - evhttp_connection_fail(evcon, EVCON_HTTP_EOF); - return; - } - - if (EVBUFFER_LENGTH(evcon->output_buffer) != 0) { - evhttp_add_event(&evcon->ev, - evcon->timeout, HTTP_WRITE_TIMEOUT); - return; - } /* Activate our call back */ if (evcon->cb != NULL) @@ -745,12 +724,13 @@ evhttp_handle_chunked_read(struct evhttp_request *req, struct evbuffer *buf) static void evhttp_read_body(struct evhttp_connection *evcon, struct evhttp_request *req) { - struct evbuffer *buf = evcon->input_buffer; + struct evbuffer *buf = bufferevent_input(evcon->bufev); if (req->chunked) { int res = evhttp_handle_chunked_read(req, buf); if (res == 1) { /* finished last chunk */ + bufferevent_disable(evcon->bufev, EV_READ); evhttp_connection_done(evcon); return; } else if (res == -1) { @@ -763,50 +743,33 @@ evhttp_read_body(struct evhttp_connection *evcon, struct evhttp_request *req) /* Read until connection close. */ evbuffer_add_buffer(req->input_buffer, buf); } else if (EVBUFFER_LENGTH(buf) >= req->ntoread) { + bufferevent_disable(evcon->bufev, EV_READ); /* Completed content length */ evbuffer_remove_buffer(buf, req->input_buffer, req->ntoread); req->ntoread = 0; evhttp_connection_done(evcon); return; } + /* Read more! */ - event_set(&evcon->ev, evcon->fd, EV_READ, evhttp_read, evcon); - EVHTTP_BASE_SET(evcon, &evcon->ev); - evhttp_add_event(&evcon->ev, evcon->timeout, HTTP_READ_TIMEOUT); + bufferevent_setcb(evcon->bufev, + evhttp_read_cb, + evhttp_write_cb, + evhttp_error_cb, + evcon); + bufferevent_enable(evcon->bufev, EV_READ); } /* - * Reads data into a buffer structure until no more data - * can be read on the file descriptor or we have read all - * the data that we wanted to read. - * Execute callback when done. + * Gets called when more data becomes available */ -void -evhttp_read(evutil_socket_t fd, short what, void *arg) +static void +evhttp_read_cb(struct bufferevent *bufev, void *arg) { struct evhttp_connection *evcon = arg; struct evhttp_request *req = TAILQ_FIRST(&evcon->requests); - struct evbuffer *buf = evcon->input_buffer; - int n, len; - if (what == EV_TIMEOUT) { - evhttp_connection_fail(evcon, EVCON_HTTP_TIMEOUT); - return; - } - n = evbuffer_read(buf, fd, -1); - len = EVBUFFER_LENGTH(buf); - event_debug(("%s: got %d on %d\n", __func__, n, fd)); - - if (n == -1) { - event_debug(("%s: evbuffer_read", __func__)); - evhttp_connection_fail(evcon, EVCON_HTTP_EOF); - return; - } else if (n == 0) { - /* Connection closed */ - evhttp_connection_done(evcon); - return; - } evhttp_read_body(evcon, req); } @@ -852,9 +815,12 @@ evhttp_connection_free(struct evhttp_connection *evcon) if (event_initialized(&evcon->close_ev)) event_del(&evcon->close_ev); - if (event_initialized(&evcon->ev)) - event_del(&evcon->ev); - + if (event_initialized(&evcon->retry_ev)) + event_del(&evcon->retry_ev); + + if (evcon->bufev != NULL) + bufferevent_free(evcon->bufev); + if (evcon->fd != -1) EVUTIL_CLOSESOCKET(evcon->fd); @@ -864,12 +830,6 @@ evhttp_connection_free(struct evhttp_connection *evcon) if (evcon->address != NULL) mm_free(evcon->address); - if (evcon->input_buffer != NULL) - evbuffer_free(evcon->input_buffer); - - if (evcon->output_buffer != NULL) - evbuffer_free(evcon->output_buffer); - mm_free(evcon); } @@ -910,8 +870,7 @@ evhttp_request_dispatch(struct evhttp_connection* evcon) void evhttp_connection_reset(struct evhttp_connection *evcon) { - if (event_initialized(&evcon->ev)) - event_del(&evcon->ev); + bufferevent_disable(evcon->bufev, EV_READ|EV_WRITE); if (evcon->fd != -1) { /* inform interested parties about connection close */ @@ -924,7 +883,7 @@ evhttp_connection_reset(struct evhttp_connection *evcon) evcon->state = EVCON_DISCONNECTED; /* remove unneeded flags */ - evcon->flags &= ~EVHTTP_CON_CLOSEDETECT; + evcon->flags &= ~(EVHTTP_CON_CLOSEDETECT|EVHTTP_CON_GOTHEADERS); } static void @@ -964,23 +923,82 @@ evhttp_connection_retry(evutil_socket_t fd, short what, void *arg) evhttp_connection_connect(evcon); } +static void +evhttp_connection_cb_cleanup(struct evhttp_connection *evcon) +{ + if (evcon->retry_max < 0 || evcon->retry_cnt < evcon->retry_max) { + evtimer_set(&evcon->retry_ev, evhttp_connection_retry, evcon); + EVHTTP_BASE_SET(evcon, &evcon->retry_ev); + evhttp_add_event(&evcon->retry_ev, + MIN(3600, 2 << evcon->retry_cnt), + HTTP_CONNECT_TIMEOUT); + evcon->retry_cnt++; + return; + } + evhttp_connection_reset(evcon); + + /* for now, we just signal all requests by executing their callbacks */ + while (TAILQ_FIRST(&evcon->requests) != NULL) { + struct evhttp_request *request = TAILQ_FIRST(&evcon->requests); + TAILQ_REMOVE(&evcon->requests, request, next); + request->evcon = NULL; + + /* we might want to set an error here */ + request->cb(request, request->cb_arg); + evhttp_request_free(request); + } +} + +static void +evhttp_error_cb(struct bufferevent *bufev, short what, void *arg) +{ + struct evhttp_connection *evcon = arg; + + switch (evcon->state) { + case EVCON_CONNECTING: + if (what == EVBUFFER_TIMEOUT) { + event_debug(("%s: connection timeout for \"%s:%d\" on %d", + __func__, evcon->address, evcon->port, + evcon->fd)); + evhttp_connection_cb_cleanup(evcon); + return; + } + break; + + case EVCON_CONNECTED: + if (what == (EVBUFFER_READ|EVBUFFER_EOF) && + (evcon->flags & EVHTTP_CON_GOTHEADERS)) { + /* EOF on read can be benign */ + evhttp_connection_done(evcon); + return; + } + break; + + case EVCON_DISCONNECTED: + default: + break; + } + + if (what & EVBUFFER_TIMEOUT) { + evhttp_connection_fail(evcon, EVCON_HTTP_TIMEOUT); + } else if (what & EVBUFFER_EOF) { + evhttp_connection_fail(evcon, EVCON_HTTP_EOF); + } else { + evhttp_connection_fail(evcon, EVCON_HTTP_BUFFER_ERROR); + } +} + /* * Call back for asynchronous connection attempt. */ static void -evhttp_connectioncb(evutil_socket_t fd, short what, void *arg) +evhttp_connection_cb(struct bufferevent *bufev, void *arg) { struct evhttp_connection *evcon = arg; int error; socklen_t errsz = sizeof(error); - if (what == EV_TIMEOUT) { - event_debug(("%s: connection timeout for \"%s:%d\" on %d", - __func__, evcon->address, evcon->port, evcon->fd)); - goto cleanup; - } - /* Check if the connection completed */ if (getsockopt(evcon->fd, SOL_SOCKET, SO_ERROR, (void*)&error, &errsz) == -1) { @@ -1003,32 +1021,28 @@ evhttp_connectioncb(evutil_socket_t fd, short what, void *arg) /* Reset the retry count as we were successful in connecting */ evcon->retry_cnt = 0; evcon->state = EVCON_CONNECTED; + evcon->flags &= ~EVHTTP_CON_GOTHEADERS; + + /* reset the bufferevent cbs */ + bufferevent_setcb(evcon->bufev, + evhttp_read_header_cb, + evhttp_write_cb, + evhttp_error_cb, + evcon); + + if (evcon->timeout == -1) + bufferevent_settimeout(evcon->bufev, + HTTP_READ_TIMEOUT, HTTP_WRITE_TIMEOUT); + else + bufferevent_settimeout(evcon->bufev, + evcon->timeout, evcon->timeout); /* try to start requests that have queued up on this connection */ evhttp_request_dispatch(evcon); return; cleanup: - if (evcon->retry_max < 0 || evcon->retry_cnt < evcon->retry_max) { - evtimer_set(&evcon->ev, evhttp_connection_retry, evcon); - EVHTTP_BASE_SET(evcon, &evcon->ev); - evhttp_add_event(&evcon->ev, MIN(3600, 2 << evcon->retry_cnt), - HTTP_CONNECT_TIMEOUT); - evcon->retry_cnt++; - return; - } - evhttp_connection_reset(evcon); - - /* for now, we just signal all requests by executing their callbacks */ - while (TAILQ_FIRST(&evcon->requests) != NULL) { - struct evhttp_request *request = TAILQ_FIRST(&evcon->requests); - TAILQ_REMOVE(&evcon->requests, request, next); - request->evcon = NULL; - - /* we might want to set an error here */ - request->cb(request, request->cb_arg); - evhttp_request_free(request); - } + evhttp_connection_cb_cleanup(evcon); } /* @@ -1366,32 +1380,15 @@ evhttp_get_body(struct evhttp_connection *evcon, struct evhttp_request *req) evhttp_read_body(evcon, req); } -void -evhttp_read_header(evutil_socket_t fd, short what, void *arg) +static void +evhttp_read_header_cb(struct bufferevent *bufev, void *arg) { struct evhttp_connection *evcon = arg; struct evhttp_request *req = TAILQ_FIRST(&evcon->requests); - int n, res; + int res; + int fd = evcon->fd; - if (what == EV_TIMEOUT) { - event_debug(("%s: timeout on %d\n", __func__, fd)); - evhttp_connection_fail(evcon, EVCON_HTTP_TIMEOUT); - return; - } - - n = evbuffer_read(evcon->input_buffer, fd, -1); - if (n == 0) { - event_debug(("%s: no more data on %d", __func__, fd)); - evhttp_connection_fail(evcon, EVCON_HTTP_EOF); - return; - } - if (n == -1) { - event_debug(("%s: bad read on %d", __func__, fd)); - evhttp_connection_fail(evcon, EVCON_HTTP_EOF); - return; - } - - res = evhttp_parse_lines(req, evcon->input_buffer); + res = evhttp_parse_lines(req, bufferevent_input(evcon->bufev)); if (res == -1) { /* Error while reading, terminate */ event_debug(("%s: bad header lines on %d\n", __func__, fd)); @@ -1399,11 +1396,15 @@ evhttp_read_header(evutil_socket_t fd, short what, void *arg) return; } else if (res == 0) { /* Need more header lines */ - evhttp_add_event(&evcon->ev, - evcon->timeout, HTTP_READ_TIMEOUT); return; } + /* Disable reading for now */ + bufferevent_disable(evcon->bufev, EV_READ); + + /* we got all headers */ + evcon->flags |= EVHTTP_CON_GOTHEADERS; + /* Done reading headers, do the real work */ switch (req->kind) { case EVHTTP_REQUEST: @@ -1466,16 +1467,14 @@ evhttp_connection_new(const char *address, unsigned short port) goto error; } - if ((evcon->input_buffer = evbuffer_new()) == NULL) { - event_warn("%s: evbuffer_new failed", __func__); + if ((evcon->bufev = bufferevent_new(-1, + evhttp_read_header_cb, + evhttp_write_cb, + evhttp_error_cb, evcon)) == NULL) { + event_warn("%s: bufferevent_new failed", __func__); goto error; } - if ((evcon->output_buffer = evbuffer_new()) == NULL) { - event_warn("%s: evbuffer_new failed", __func__); - goto error; - } - evcon->state = EVCON_DISCONNECTED; TAILQ_INIT(&evcon->requests); @@ -1493,6 +1492,7 @@ void evhttp_connection_set_base(struct evhttp_connection *evcon, assert(evcon->base == NULL); assert(evcon->state == EVCON_DISCONNECTED); evcon->base = base; + bufferevent_base_set(base, evcon->bufev); } void @@ -1500,6 +1500,13 @@ evhttp_connection_set_timeout(struct evhttp_connection *evcon, int timeout_in_secs) { evcon->timeout = timeout_in_secs; + + if (evcon->timeout == -1) + bufferevent_settimeout(evcon->bufev, + HTTP_READ_TIMEOUT, HTTP_WRITE_TIMEOUT); + else + bufferevent_settimeout(evcon->bufev, + evcon->timeout, evcon->timeout); } void @@ -1549,9 +1556,15 @@ evhttp_connection_connect(struct evhttp_connection *evcon) } /* Set up a callback for successful connection setup */ - event_set(&evcon->ev, evcon->fd, EV_WRITE, evhttp_connectioncb, evcon); - EVHTTP_BASE_SET(evcon, &evcon->ev); - evhttp_add_event(&evcon->ev, evcon->timeout, HTTP_CONNECT_TIMEOUT); + bufferevent_setfd(evcon->bufev, evcon->fd); + bufferevent_setcb(evcon->bufev, + NULL /* evhttp_read_cb */, + evhttp_connection_cb, + evhttp_error_cb, evcon); + bufferevent_settimeout(evcon->bufev, 0, + evcon->timeout != -1 ? evcon->timeout : HTTP_CONNECT_TIMEOUT); + /* make sure that we get a write callback */ + bufferevent_enable(evcon->bufev, EV_WRITE); evcon->state = EVCON_CONNECTING; @@ -1613,12 +1626,8 @@ void evhttp_start_read(struct evhttp_connection *evcon) { /* Set up an event to read the headers */ - if (event_initialized(&evcon->ev)) - event_del(&evcon->ev); - event_set(&evcon->ev, evcon->fd, EV_READ, evhttp_read_header, evcon); - EVHTTP_BASE_SET(evcon, &evcon->ev); - - evhttp_add_event(&evcon->ev, evcon->timeout, HTTP_READ_TIMEOUT); + bufferevent_disable(evcon->bufev, EV_WRITE); + bufferevent_enable(evcon->bufev, EV_READ); } static void @@ -1729,13 +1738,14 @@ evhttp_send_reply_start(struct evhttp_request *req, int code, void evhttp_send_reply_chunk(struct evhttp_request *req, struct evbuffer *databuf) { + struct evbuffer *output = bufferevent_output(req->evcon->bufev); if (req->chunked) { - evbuffer_add_printf(req->evcon->output_buffer, "%x\r\n", + evbuffer_add_printf(output, "%x\r\n", (unsigned)EVBUFFER_LENGTH(databuf)); } - evbuffer_add_buffer(req->evcon->output_buffer, databuf); + evbuffer_add_buffer(output, databuf); if (req->chunked) { - evbuffer_add(req->evcon->output_buffer, "\r\n", 2); + evbuffer_add(output, "\r\n", 2); } evhttp_write_buffer(req->evcon, NULL, NULL); } @@ -1744,12 +1754,13 @@ void evhttp_send_reply_end(struct evhttp_request *req) { struct evhttp_connection *evcon = req->evcon; + struct evbuffer *output = bufferevent_output(evcon->bufev); if (req->chunked) { - evbuffer_add(req->evcon->output_buffer, "0\r\n\r\n", 5); + evbuffer_add(output, "0\r\n\r\n", 5); evhttp_write_buffer(req->evcon, evhttp_send_done, NULL); req->chunked = 0; - } else if (!event_pending(&evcon->ev, EV_WRITE|EV_TIMEOUT, NULL)) { + } else if (EVBUFFER_LENGTH(output) == 0) { /* let the connection know that we are done with the request */ evhttp_send_done(evcon, NULL); } else { @@ -2291,14 +2302,17 @@ evhttp_get_request_connection( if (evcon == NULL) return (NULL); - /* associate the base if we have one*/ - evhttp_connection_set_base(evcon, http->base); + /* associate the base if we have one */ + if (http->base != NULL) + evhttp_connection_set_base(evcon, http->base); evcon->flags |= EVHTTP_CON_INCOMING; evcon->state = EVCON_CONNECTED; evcon->fd = fd; + bufferevent_setfd(evcon->bufev, fd); + return (evcon); }