diff --git a/buffer.c b/buffer.c index 41fc71d1..f5f8a2c0 100644 --- a/buffer.c +++ b/buffer.c @@ -80,10 +80,12 @@ #include "event2/event.h" #include "event2/buffer.h" #include "event2/buffer_compat.h" +#include "event2/thread.h" #include "event-config.h" #include "log-internal.h" #include "mm-internal.h" #include "util-internal.h" +#include "evthread-internal.h" #include "evbuffer-internal.h" /* some systems do not have MAP_FAILED */ @@ -199,6 +201,7 @@ evbuffer_chain_free(struct evbuffer_chain *chain) static inline void evbuffer_chain_insert(struct evbuffer *buf, struct evbuffer_chain *chain) { + ASSERT_EVBUFFER_LOCKED(buf); if (buf->first == NULL) { buf->first = buf->last = chain; buf->previous_to_last = NULL; @@ -230,6 +233,30 @@ evbuffer_new(void) return (buffer); } +int +evbuffer_enable_locking(struct evbuffer *buf, void *lock) +{ +#ifdef _EVENT_DISABLE_THREAD_SUPPORT + return -1; +#else + if (buf->lock) + return -1; + + if (!lock) { + EVTHREAD_ALLOC_LOCK(lock); + if (!lock) + return -1; + buf->lock = lock; + buf->own_lock = 1; + } else { + buf->lock = lock; + buf->own_lock = 0; + } + + return 0; +#endif +} + static inline void evbuffer_invoke_callbacks(struct evbuffer *buffer) { @@ -237,6 +264,8 @@ evbuffer_invoke_callbacks(struct evbuffer *buffer) struct evbuffer_cb_info info; size_t new_size; + ASSERT_EVBUFFER_LOCKED(buffer); + if (TAILQ_EMPTY(&buffer->callbacks)) { buffer->n_add_for_cb = buffer->n_del_for_cb = 0; return; @@ -276,6 +305,7 @@ static void evbuffer_remove_all_callbacks(struct evbuffer *buffer) { struct evbuffer_cb_entry *cbent; + while ((cbent = TAILQ_FIRST(&buffer->callbacks))) { TAILQ_REMOVE(&buffer->callbacks, cbent, next); mm_free(cbent); @@ -286,57 +316,104 @@ void evbuffer_free(struct evbuffer *buffer) { struct evbuffer_chain *chain, *next; + + ASSERT_EVBUFFER_UNLOCKED(buffer); + for (chain = buffer->first; chain != NULL; chain = next) { next = chain->next; evbuffer_chain_free(chain); } evbuffer_remove_all_callbacks(buffer); + if (buffer->own_lock) + EVTHREAD_FREE_LOCK(buffer->lock); mm_free(buffer); } +void +evbuffer_lock(struct evbuffer *buf) +{ + EVBUFFER_LOCK(buf, EVTHREAD_WRITE); +} + +void +evbuffer_unlock(struct evbuffer *buf) +{ + EVBUFFER_UNLOCK(buf, EVTHREAD_WRITE); +} + size_t evbuffer_get_length(const struct evbuffer *buffer) { - return (buffer->total_len); + size_t result; + + EVBUFFER_LOCK(buffer, EVTHREAD_READ); + + result = (buffer->total_len); + + EVBUFFER_UNLOCK(buffer, EVTHREAD_READ); + + return result; } size_t evbuffer_get_contiguous_space(const struct evbuffer *buf) { - struct evbuffer_chain *chain = buf->first; + struct evbuffer_chain *chain; + size_t result; - return (chain != NULL ? chain->off : 0); + EVBUFFER_LOCK(buf, EVTHREAD_READ); + chain = buf->first; + result = (chain != NULL ? chain->off : 0); + EVBUFFER_UNLOCK(buf, EVTHREAD_READ); + + return result; } unsigned char * evbuffer_reserve_space(struct evbuffer *buf, size_t size) { struct evbuffer_chain *chain; + unsigned char *result = NULL; + + EVBUFFER_LOCK(buf, EVTHREAD_WRITE); if (evbuffer_expand(buf, size) == -1) - return (NULL); + goto done; chain = buf->last; - return (chain->buffer + chain->misalign + chain->off); + result = (chain->buffer + chain->misalign + chain->off); + +done: + EVBUFFER_UNLOCK(buf, EVTHREAD_WRITE); + + return result; } int evbuffer_commit_space(struct evbuffer *buf, size_t size) { - struct evbuffer_chain *chain = buf->last; + struct evbuffer_chain *chain; + int result = -1; + + EVBUFFER_LOCK(buf, EVTHREAD_WRITE); + chain = buf->last; if (chain == NULL || chain->buffer_len - chain->off - chain->misalign < size) - return (-1); + goto done; chain->off += size; buf->total_len += size; - return (0); + result = 0; +done: + EVBUFFER_UNLOCK(buf, EVTHREAD_WRITE); + return result; } #define ZERO_CHAIN(dst) do { \ + ASSERT_EVBUFFER_LOCKED(dst); \ (dst)->first = NULL; \ (dst)->last = NULL; \ (dst)->previous_to_last = NULL; \ @@ -344,6 +421,8 @@ evbuffer_commit_space(struct evbuffer *buf, size_t size) } while (0) #define COPY_CHAIN(dst, src) do { \ + ASSERT_EVBUFFER_LOCKED(dst); \ + ASSERT_EVBUFFER_LOCKED(src); \ (dst)->first = (src)->first; \ (dst)->previous_to_last = (src)->previous_to_last; \ (dst)->last = (src)->last; \ @@ -351,6 +430,8 @@ evbuffer_commit_space(struct evbuffer *buf, size_t size) } while (0) #define APPEND_CHAIN(dst, src) do { \ + ASSERT_EVBUFFER_LOCKED(dst); \ + ASSERT_EVBUFFER_LOCKED(src); \ (dst)->last->next = (src)->first; \ (dst)->previous_to_last = (src)->previous_to_last ? \ (src)->previous_to_last : (dst)->last; \ @@ -358,23 +439,29 @@ evbuffer_commit_space(struct evbuffer *buf, size_t size) (dst)->total_len += (src)->total_len; \ } while (0) -#define PREPEND_CHAIN(dst, src) do { \ - (src)->last->next = (dst)->first; \ - (dst)->first = (src)->first; \ - (dst)->total_len += (src)->total_len; \ - if ((dst)->previous_to_last == NULL) \ - (dst)->previous_to_last = (src)->last; \ +#define PREPEND_CHAIN(dst, src) do { \ + ASSERT_EVBUFFER_LOCKED(dst); \ + ASSERT_EVBUFFER_LOCKED(src); \ + (src)->last->next = (dst)->first; \ + (dst)->first = (src)->first; \ + (dst)->total_len += (src)->total_len; \ + if ((dst)->previous_to_last == NULL) \ + (dst)->previous_to_last = (src)->last; \ } while (0) int evbuffer_add_buffer(struct evbuffer *outbuf, struct evbuffer *inbuf) { - size_t in_total_len = inbuf->total_len; - size_t out_total_len = outbuf->total_len; + size_t in_total_len, out_total_len; + + EVBUFFER_LOCK2(inbuf, outbuf); + + in_total_len = inbuf->total_len; + out_total_len = outbuf->total_len; if (in_total_len == 0 || outbuf == inbuf) - return (0); + goto done; if (out_total_len == 0) { COPY_CHAIN(outbuf, inbuf); @@ -390,17 +477,23 @@ evbuffer_add_buffer(struct evbuffer *outbuf, struct evbuffer *inbuf) evbuffer_invoke_callbacks(inbuf); evbuffer_invoke_callbacks(outbuf); +done: + EVBUFFER_UNLOCK2(inbuf, outbuf); return (0); } void evbuffer_prepend_buffer(struct evbuffer *outbuf, struct evbuffer *inbuf) { - size_t in_total_len = inbuf->total_len; - size_t out_total_len = outbuf->total_len; + size_t in_total_len, out_total_len; + + EVBUFFER_LOCK2(inbuf, outbuf); + + in_total_len = inbuf->total_len; + out_total_len = outbuf->total_len; if (!in_total_len || inbuf == outbuf) - return; + goto done; if (out_total_len == 0) { COPY_CHAIN(outbuf, inbuf); @@ -415,16 +508,21 @@ evbuffer_prepend_buffer(struct evbuffer *outbuf, struct evbuffer *inbuf) evbuffer_invoke_callbacks(inbuf); evbuffer_invoke_callbacks(outbuf); +done: + EVBUFFER_UNLOCK2(inbuf, outbuf); } void evbuffer_drain(struct evbuffer *buf, size_t len) { struct evbuffer_chain *chain, *next; - size_t old_len = buf->total_len; + size_t old_len; + + EVBUFFER_LOCK(buf, EVTHREAD_WRITE); + old_len = buf->total_len; if (old_len == 0) - return; + goto done; if (len >= old_len) { len = old_len; @@ -455,6 +553,9 @@ evbuffer_drain(struct evbuffer *buf, size_t len) buf->n_del_for_cb += len; /* Tell someone about changes in this buffer */ evbuffer_invoke_callbacks(buf); + +done: + EVBUFFER_UNLOCK(buf, EVTHREAD_WRITE); } /* Reads data from an event buffer and drains the bytes read */ @@ -463,15 +564,20 @@ int evbuffer_remove(struct evbuffer *buf, void *data_out, size_t datlen) { /*XXX fails badly on sendfile case. */ - struct evbuffer_chain *chain = buf->first, *tmp; + struct evbuffer_chain *chain, *tmp; char *data = data_out; size_t nread; + int result = 0; + + EVBUFFER_LOCK(buf, EVTHREAD_WRITE); + + chain = buf->first; if (datlen >= buf->total_len) datlen = buf->total_len; if (datlen == 0) - return (0); + goto done; nread = datlen; @@ -503,7 +609,10 @@ evbuffer_remove(struct evbuffer *buf, void *data_out, size_t datlen) if (nread) evbuffer_invoke_callbacks(buf); - return (nread); + result = nread; +done: + EVBUFFER_UNLOCK(buf, EVTHREAD_WRITE); + return result; } /* reads data from the src buffer to the dst buffer, avoids memcpy as @@ -515,18 +624,25 @@ evbuffer_remove_buffer(struct evbuffer *src, struct evbuffer *dst, /*XXX We should have an option to force this to be zero-copy.*/ /*XXX can fail badly on sendfile case. */ - struct evbuffer_chain *chain = src->first; - struct evbuffer_chain *previous = chain, *previous_to_previous = NULL; + struct evbuffer_chain *chain, *previous, *previous_to_previous = NULL; size_t nread = 0; + int result; - if (datlen == 0 || dst == src) - return (0); + EVBUFFER_LOCK2(src, dst); + + chain = previous = src->first; + + if (datlen == 0 || dst == src) { + result = 0; + goto done; + } /* short-cut if there is no more data buffered */ if (datlen >= src->total_len) { datlen = src->total_len; evbuffer_add_buffer(dst, src); - return (datlen); + result = datlen; + goto done; } /* removes chains if possible */ @@ -570,15 +686,22 @@ evbuffer_remove_buffer(struct evbuffer *src, struct evbuffer *dst, evbuffer_invoke_callbacks(dst); evbuffer_invoke_callbacks(src); } + result = nread; - return (nread); +done: + EVBUFFER_UNLOCK2(src, dst); + return result; } unsigned char * evbuffer_pullup(struct evbuffer *buf, ssize_t size) { - struct evbuffer_chain *chain = buf->first, *next, *tmp; - unsigned char *buffer; + struct evbuffer_chain *chain, *next, *tmp; + unsigned char *buffer, *result = NULL; + + EVBUFFER_LOCK(buf, EVTHREAD_WRITE); + + chain = buf->first; if (size == -1) size = buf->total_len; @@ -586,12 +709,14 @@ evbuffer_pullup(struct evbuffer *buf, ssize_t size) * is going to have a long enough buffer afterwards; so we return * NULL */ if (size == 0 || size > buf->total_len) - return (NULL); + goto done; /* No need to pull up anything; the first size bytes are * already here. */ - if (chain->off >= size) - return chain->buffer + chain->misalign; + if (chain->off >= size) { + result = chain->buffer + chain->misalign; + goto done; + } if (chain->buffer_len - chain->misalign >= size) { /* already have enough space in the first chain */ @@ -604,7 +729,7 @@ evbuffer_pullup(struct evbuffer *buf, ssize_t size) } else { if ((tmp = evbuffer_chain_new(size)) == NULL) { event_warn("%s: out of memory\n", __func__); - return (NULL); + goto done; } buffer = tmp->buffer; tmp->off = size; @@ -638,7 +763,11 @@ evbuffer_pullup(struct evbuffer *buf, ssize_t size) tmp->next = chain; - return (tmp->buffer + tmp->misalign); + result = (tmp->buffer + tmp->misalign); + +done: + EVBUFFER_UNLOCK(buf, EVTHREAD_WRITE); + return result; } /* @@ -754,6 +883,9 @@ evbuffer_readln(struct evbuffer *buffer, size_t *n_read_out, char *line, chr; unsigned int n_to_copy, extra_drain; int count = 0; + char *result = NULL; + + EVBUFFER_LOCK(buffer, EVTHREAD_WRITE); it.chain = buffer->first; it.off = 0; @@ -764,7 +896,7 @@ evbuffer_readln(struct evbuffer *buffer, size_t *n_read_out, case EVBUFFER_EOL_ANY: count = evbuffer_strpbrk(&it, "\r\n"); if (count == -1) - return (NULL); + goto done; n_to_copy = count; extra_drain = evbuffer_strspn(it.chain, it.off, "\r\n"); @@ -775,7 +907,7 @@ evbuffer_readln(struct evbuffer *buffer, size_t *n_read_out, count += tmp; ++it.off; if (evbuffer_getchr(&it, &chr) == -1) - return (NULL); + goto done; if (chr == '\n') { n_to_copy = count; break; @@ -783,7 +915,7 @@ evbuffer_readln(struct evbuffer *buffer, size_t *n_read_out, ++count; } if (tmp == -1) - return (NULL); + goto done; extra_drain = 2; break; } @@ -791,18 +923,18 @@ evbuffer_readln(struct evbuffer *buffer, size_t *n_read_out, /* we might strip a preceding '\r' */ case EVBUFFER_EOL_LF: if ((count = evbuffer_strchr(&it, '\n')) == -1) - return (NULL); + goto done; n_to_copy = count; extra_drain = 1; break; default: - return (NULL); + goto done; } if ((line = mm_malloc(n_to_copy+1)) == NULL) { event_warn("%s: out of memory\n", __func__); evbuffer_drain(buffer, n_to_copy + extra_drain); - return (NULL); + goto done; } evbuffer_remove(buffer, line, n_to_copy); @@ -815,7 +947,10 @@ evbuffer_readln(struct evbuffer *buffer, size_t *n_read_out, if (n_read_out) *n_read_out = (size_t)n_to_copy; - return (line); + result = line; +done: + EVBUFFER_UNLOCK(buffer, EVTHREAD_WRITE); + return result; } #define EVBUFFER_CHAIN_MAX_AUTO_SIZE 4096 @@ -825,15 +960,20 @@ evbuffer_readln(struct evbuffer *buffer, size_t *n_read_out, int evbuffer_add(struct evbuffer *buf, const void *data_in, size_t datlen) { - struct evbuffer_chain *chain = buf->last, *tmp; + struct evbuffer_chain *chain, *tmp; const unsigned char *data = data_in; size_t remain, to_alloc; + int result = -1; + + EVBUFFER_LOCK(buf, EVTHREAD_WRITE); + + chain = buf->last; /* If there are no chains allocated for this buffer, allocate one * big enough to hold all the data. */ if (chain == NULL) { if (evbuffer_expand(buf, datlen) == -1) - return (-1); + goto done; chain = buf->last; } @@ -874,7 +1014,7 @@ evbuffer_add(struct evbuffer *buf, const void *data_in, size_t datlen) to_alloc = datlen; tmp = evbuffer_chain_new(to_alloc); if (tmp == NULL) - return (-1); + goto done; if (remain) { memcpy(chain->buffer + chain->misalign + chain->off, @@ -893,18 +1033,24 @@ evbuffer_add(struct evbuffer *buf, const void *data_in, size_t datlen) out: evbuffer_invoke_callbacks(buf); - - return (0); + result = 0; +done: + EVBUFFER_UNLOCK(buf, EVTHREAD_WRITE); + return result; } int evbuffer_prepend(struct evbuffer *buf, const void *data, size_t datlen) { - struct evbuffer_chain *chain = buf->first, *tmp; + struct evbuffer_chain *chain, *tmp; + int result = -1; + + EVBUFFER_LOCK(buf, EVTHREAD_WRITE); + chain = buf->first; if (chain == NULL) { if (evbuffer_expand(buf, datlen) == -1) - return (-1); + goto done; chain = buf->first; chain->misalign = chain->buffer_len; } @@ -934,7 +1080,7 @@ evbuffer_prepend(struct evbuffer *buf, const void *data, size_t datlen) /* we need to add another chain */ if ((tmp = evbuffer_chain_new(datlen)) == NULL) - return (-1); + goto done; buf->first = tmp; if (buf->previous_to_last == NULL) buf->previous_to_last = tmp; @@ -949,8 +1095,10 @@ evbuffer_prepend(struct evbuffer *buf, const void *data, size_t datlen) out: evbuffer_invoke_callbacks(buf); - - return (0); + result = 0; +done: + EVBUFFER_UNLOCK(buf, EVTHREAD_WRITE); + return result; } /** Helper: realigns the memory in chain->buffer so that misalign is 0. */ @@ -969,23 +1117,28 @@ evbuffer_expand(struct evbuffer *buf, size_t datlen) { /* XXX we should either make this function less costly, or call it * less often. */ - struct evbuffer_chain *chain = buf->last, *tmp; + struct evbuffer_chain *chain, *tmp; size_t need, length; + int result = -1; + + EVBUFFER_LOCK(buf, EVTHREAD_WRITE); + + chain = buf->last; if (chain == NULL || (chain->flags & EVBUFFER_IMMUTABLE)) { chain = evbuffer_chain_new(datlen); if (chain == NULL) - return (-1); + goto err; evbuffer_chain_insert(buf, chain); - return (0); + goto ok; } need = chain->misalign + chain->off + datlen; /* If we can fit all the data, then we don't have to do anything */ if (chain->buffer_len >= need) - return (0); + goto ok; /* If the misalignment plus the remaining space fulfils our * data needs, we just force an alignment to happen. @@ -993,14 +1146,14 @@ evbuffer_expand(struct evbuffer *buf, size_t datlen) */ if (chain->buffer_len - chain->off >= datlen) { evbuffer_chain_align(chain); - return (0); + goto ok; } /* figure out how much space we need */ length = chain->buffer_len - chain->misalign + datlen; tmp = evbuffer_chain_new(length); if (tmp == NULL) - return (-1); + goto err; /* copy the data over that we had so far */ tmp->off = chain->off; tmp->misalign = 0; @@ -1015,7 +1168,11 @@ evbuffer_expand(struct evbuffer *buf, size_t datlen) evbuffer_chain_free(chain); - return (0); +ok: + result = 0; +err: + EVBUFFER_UNLOCK(buf, EVTHREAD_WRITE); + return result; } /* Make sure that datlen bytes are available for writing in the last two @@ -1026,6 +1183,8 @@ _evbuffer_expand_fast(struct evbuffer *buf, size_t datlen) struct evbuffer_chain *chain = buf->last, *tmp; size_t avail, avail_in_prev = 0; + ASSERT_EVBUFFER_LOCKED(buf); + if (chain == NULL || (chain->flags & EVBUFFER_IMMUTABLE)) { chain = evbuffer_chain_new(datlen); if (chain == NULL) @@ -1121,15 +1280,21 @@ evbuffer_read(struct evbuffer *buf, evutil_socket_t fd, int howmuch) { struct evbuffer_chain *chain = buf->last; int n = EVBUFFER_MAX_READ; + int result; + #ifdef USE_IOVEC_IMPL int nvecs; #else unsigned char *p; #endif +#if defined(FIONREAD) && defined(WIN32) + long lng = n; +#endif + + EVBUFFER_LOCK(buf, EVTHREAD_WRITE); #if defined(FIONREAD) #ifdef WIN32 - long lng = n; if (ioctlsocket(fd, FIONREAD, &lng) == -1 || (n=lng) == 0) { #else if (ioctl(fd, FIONREAD, &n) == -1 || n == 0) { @@ -1156,7 +1321,8 @@ evbuffer_read(struct evbuffer *buf, evutil_socket_t fd, int howmuch) /* Since we can use iovecs, we're willing to use the last * _two_ chains. */ if (_evbuffer_expand_fast(buf, howmuch) == -1) { - return(-1); + result = -1; + goto done; } else { IOV_TYPE vecs[2]; chain = buf->last; @@ -1214,8 +1380,10 @@ evbuffer_read(struct evbuffer *buf, evutil_socket_t fd, int howmuch) /* If we don't have FIONREAD, we might waste some space here */ /* XXX we _will_ waste some space here if there is any space left * over on buf->last. */ - if (evbuffer_expand(buf, howmuch) == -1) - return (-1); + if (evbuffer_expand(buf, howmuch) == -1) { + result = -1; + goto done; + } chain = buf->last; @@ -1229,15 +1397,19 @@ evbuffer_read(struct evbuffer *buf, evutil_socket_t fd, int howmuch) #endif #endif /* USE_IOVEC_IMPL */ - if (n == -1) - return (-1); - if (n == 0) - return (0); + if (n == -1) { + result = -1; + goto done; + } + if (n == 0) { + result = 0; + goto done; + } #ifdef USE_IOVEC_IMPL if (nvecs == 2) { size_t space = CHAIN_SPACE_LEN(buf->previous_to_last); - if (space < n) { + if (space < n) { buf->previous_to_last->off += space; chain->off += n-space; } else { @@ -1254,8 +1426,10 @@ evbuffer_read(struct evbuffer *buf, evutil_socket_t fd, int howmuch) /* Tell someone about changes in this buffer */ evbuffer_invoke_callbacks(buf); - - return (n); + result = n; +done: + EVBUFFER_UNLOCK(buf, EVTHREAD_WRITE); + return result; } #ifdef USE_IOVEC_IMPL @@ -1266,6 +1440,8 @@ ssize_t howmuch) IOV_TYPE iov[NUM_IOVEC]; struct evbuffer_chain *chain = buffer->first; int n, i = 0; + + ASSERT_EVBUFFER_LOCKED(buffer); /* XXX make this top out at some maximal data length? if the * buffer has (say) 1MB in it, split over 128 chains, there's * no way it all gets written in one go. */ @@ -1311,6 +1487,14 @@ evbuffer_write_sendfile(struct evbuffer *buffer, evutil_socket_t fd, #ifdef SENDFILE_IS_FREEBSD int res; off_t len = chain->off; +#elif SENDFILE_IS_LINUX + ssize_t res; + off_t offset = chain->misalign; +#endif + + ASSERT_EVBUFFER_LOCKED(buffer); + +#ifdef SENDFILE_IS_FREEBSD res = sendfile(info->fd, fd, chain->misalign, &len, NULL, 0); if (res == -1 && !EVUTIL_ERR_RW_RETRIABLE(errno)) return (-1); @@ -1318,8 +1502,6 @@ evbuffer_write_sendfile(struct evbuffer *buffer, evutil_socket_t fd, return (len); #elif SENDFILE_IS_LINUX /* TODO(niels): implement splice */ - ssize_t res; - off_t offset = chain->misalign; res = sendfile(fd, info->fd, &offset, chain->off); if (res == -1 && EVUTIL_ERR_RW_RETRIABLE(errno)) { /* if this is EGAIN or EINTR return 0; otherwise, -1 */ @@ -1336,6 +1518,8 @@ evbuffer_write_atmost(struct evbuffer *buffer, evutil_socket_t fd, { int n; + EVBUFFER_LOCK(buffer, EVTHREAD_WRITE); + if (howmuch < 0) howmuch = buffer->total_len; @@ -1359,12 +1543,10 @@ evbuffer_write_atmost(struct evbuffer *buffer, evutil_socket_t fd, #endif } - if (n == -1) - return (-1); - if (n == 0) - return (0); - evbuffer_drain(buffer, n); + if (n > 0) + evbuffer_drain(buffer, n); + EVBUFFER_UNLOCK(buffer, EVTHREAD_WRITE); return (n); } @@ -1380,12 +1562,17 @@ evbuffer_find(struct evbuffer *buffer, const unsigned char *what, size_t len) unsigned char *search; struct evbuffer_ptr ptr; - ptr = evbuffer_search(buffer, (const char *)what, len, NULL); - if (ptr.pos < 0) - return (NULL); + EVBUFFER_LOCK(buffer, EVTHREAD_WRITE); - search = evbuffer_pullup(buffer, ptr.pos + len); - return search + ptr.pos; + ptr = evbuffer_search(buffer, (const char *)what, len, NULL); + if (ptr.pos < 0) { + search = NULL; + } else { + search = evbuffer_pullup(buffer, ptr.pos + len); + search += ptr.pos; + } + EVBUFFER_UNLOCK(buffer,EVTHREAD_WRITE); + return search; } int @@ -1395,6 +1582,8 @@ evbuffer_ptr_set(struct evbuffer *buf, struct evbuffer_ptr *pos, size_t left = position; struct evbuffer_chain *chain = NULL; + EVBUFFER_LOCK(buf, EVTHREAD_READ); + switch (how) { case EVBUFFER_PTR_SET: chain = buf->first; @@ -1423,6 +1612,8 @@ evbuffer_ptr_set(struct evbuffer *buf, struct evbuffer_ptr *pos, pos->pos = -1; } + EVBUFFER_UNLOCK(buf, EVTHREAD_READ); + return chain != NULL ? 0 : -1; } @@ -1438,6 +1629,8 @@ evbuffer_ptr_memcmp(const struct evbuffer *buf, const struct evbuffer_ptr *pos, size_t position; int r; + ASSERT_EVBUFFER_LOCKED(buf); + if (pos->pos + len > buf->total_len) return -1; @@ -1470,6 +1663,8 @@ evbuffer_search(struct evbuffer *buffer, const char *what, size_t len, const str const unsigned char *p; char first; + EVBUFFER_LOCK(buffer, EVTHREAD_READ); + if (start) { memcpy(&pos, start, sizeof(pos)); chain = pos._internal.chain; @@ -1480,7 +1675,7 @@ evbuffer_search(struct evbuffer *buffer, const char *what, size_t len, const str } if (!len) - return pos; + goto done; first = what[0]; @@ -1494,7 +1689,7 @@ evbuffer_search(struct evbuffer *buffer, const char *what, size_t len, const str pos.pos += p - start_at; pos._internal.pos_in_chain += p - start_at; if (!evbuffer_ptr_memcmp(buffer, &pos, what, len)) - return pos; + goto done; ++pos.pos; ++pos._internal.pos_in_chain; if (pos._internal.pos_in_chain == chain->off) { @@ -1510,6 +1705,8 @@ evbuffer_search(struct evbuffer *buffer, const char *what, size_t len, const str pos.pos = -1; pos._internal.chain = NULL; +done: + EVBUFFER_UNLOCK(buffer, EVTHREAD_READ); return pos; } @@ -1519,12 +1716,14 @@ evbuffer_add_vprintf(struct evbuffer *buf, const char *fmt, va_list ap) { char *buffer; size_t space; - int sz; + int sz, result = -1; va_list aq; + EVBUFFER_LOCK(buf, EVTHREAD_WRITE); + /* make sure that at least some space is available */ if (evbuffer_expand(buf, 64) == -1) - return (-1); + goto done; for (;;) { struct evbuffer_chain *chain = buf->last; @@ -1543,20 +1742,24 @@ evbuffer_add_vprintf(struct evbuffer *buf, const char *fmt, va_list ap) va_end(aq); if (sz < 0) - return (-1); + goto done; if (sz < space) { chain->off += sz; buf->total_len += sz; buf->n_add_for_cb += sz; evbuffer_invoke_callbacks(buf); - return (sz); + result = sz; + goto done; } if (evbuffer_expand(buf, sz + 1) == -1) - return (-1); - - } + goto done; + } /* NOTREACHED */ + +done: + EVBUFFER_UNLOCK(buf, EVTHREAD_WRITE); + return result; } int @@ -1592,10 +1795,12 @@ evbuffer_add_reference(struct evbuffer *outbuf, info->cleanupfn = cleanupfn; info->extra = extra; + EVBUFFER_LOCK(outbuf, EVTHREAD_WRITE); evbuffer_chain_insert(outbuf, chain); outbuf->n_add_for_cb += datlen; evbuffer_invoke_callbacks(outbuf); + EVBUFFER_UNLOCK(outbuf, EVTHREAD_WRITE); return (0); } @@ -1633,6 +1838,7 @@ evbuffer_add_file(struct evbuffer *outbuf, int fd, info = EVBUFFER_CHAIN_EXTRA(struct evbuffer_chain_fd, chain); info->fd = fd; + EVBUFFER_LOCK(outbuf, EVTHREAD_WRITE); outbuf->n_add_for_cb += length; evbuffer_chain_insert(outbuf, chain); } else @@ -1671,6 +1877,7 @@ evbuffer_add_file(struct evbuffer *outbuf, int fd, info = EVBUFFER_CHAIN_EXTRA(struct evbuffer_chain_fd, chain); info->fd = fd; + EVBUFFER_LOCK(outbuf, EVTHREAD_WRITE); outbuf->n_add_for_cb += length; evbuffer_chain_insert(outbuf, chain); @@ -1705,6 +1912,7 @@ evbuffer_add_file(struct evbuffer *outbuf, int fd, length -= read; } + EVBUFFER_LOCK(outbuf, EVTHREAD_WRITE); evbuffer_add_buffer(outbuf, tmp); evbuffer_free(tmp); @@ -1712,6 +1920,7 @@ evbuffer_add_file(struct evbuffer *outbuf, int fd, } evbuffer_invoke_callbacks(outbuf); + EVBUFFER_UNLOCK(outbuf, EVTHREAD_WRITE); return (0); } @@ -1720,6 +1929,8 @@ evbuffer_add_file(struct evbuffer *outbuf, int fd, void evbuffer_setcb(struct evbuffer *buffer, evbuffer_cb cb, void *cbarg) { + EVBUFFER_LOCK(buffer, EVTHREAD_WRITE); + if (!TAILQ_EMPTY(&buffer->callbacks)) evbuffer_remove_all_callbacks(buffer); @@ -1729,6 +1940,7 @@ evbuffer_setcb(struct evbuffer *buffer, evbuffer_cb cb, void *cbarg) ent->cb.cb_obsolete = cb; ent->flags |= EVBUFFER_CB_OBSOLETE; } + EVBUFFER_UNLOCK(buffer, EVTHREAD_WRITE); } struct evbuffer_cb_entry * @@ -1737,10 +1949,12 @@ evbuffer_add_cb(struct evbuffer *buffer, evbuffer_cb_func cb, void *cbarg) struct evbuffer_cb_entry *e; if (! (e = mm_calloc(1, sizeof(struct evbuffer_cb_entry)))) return NULL; + EVBUFFER_LOCK(buffer, EVTHREAD_WRITE); e->cb.cb_func = cb; e->cbarg = cbarg; e->flags = EVBUFFER_CB_ENABLED; TAILQ_INSERT_HEAD(&buffer->callbacks, e, next); + EVBUFFER_UNLOCK(buffer, EVTHREAD_WRITE); return e; } @@ -1748,7 +1962,9 @@ int evbuffer_remove_cb_entry(struct evbuffer *buffer, struct evbuffer_cb_entry *ent) { + EVBUFFER_LOCK(buffer, EVTHREAD_WRITE); TAILQ_REMOVE(&buffer->callbacks, ent, next); + EVBUFFER_UNLOCK(buffer, EVTHREAD_WRITE); mm_free(ent); return 0; } @@ -1757,20 +1973,26 @@ int evbuffer_remove_cb(struct evbuffer *buffer, evbuffer_cb_func cb, void *cbarg) { struct evbuffer_cb_entry *cbent; + int result = -1; + EVBUFFER_LOCK(buffer, EVTHREAD_WRITE); TAILQ_FOREACH(cbent, &buffer->callbacks, next) { if (cb == cbent->cb.cb_func && cbarg == cbent->cbarg) { - return evbuffer_remove_cb_entry(buffer, cbent); + result = evbuffer_remove_cb_entry(buffer, cbent); + goto done; } } - return -1; +done: + EVBUFFER_UNLOCK(buffer, EVTHREAD_WRITE); + return result; } int evbuffer_cb_set_flags(struct evbuffer *buffer, struct evbuffer_cb_entry *cb, ev_uint32_t flags) { - (void)buffer; /* unused */ + EVBUFFER_LOCK(buffer, EVTHREAD_WRITE); cb->flags = (cb->flags & EVBUFFER_CB_INTERNAL_FLAGS) | flags; + EVBUFFER_UNLOCK(buffer, EVTHREAD_WRITE); return 0; } diff --git a/evbuffer-internal.h b/evbuffer-internal.h index 2751a472..41128cfc 100644 --- a/evbuffer-internal.h +++ b/evbuffer-internal.h @@ -72,6 +72,12 @@ struct evbuffer { size_t n_add_for_cb; size_t n_del_for_cb; +#ifndef _EVENT_DISABLE_THREAD_SUPPORT + void *lock; +#endif + unsigned own_lock : 1; + int lock_count : 31; + TAILQ_HEAD(evbuffer_cb_queue, evbuffer_cb_entry) callbacks; }; @@ -125,6 +131,50 @@ struct evbuffer_chain_reference { #define EVBUFFER_CHAIN_SIZE sizeof(struct evbuffer_chain) #define EVBUFFER_CHAIN_EXTRA(t, c) (t *)((struct evbuffer_chain *)(c) + 1) +#define ASSERT_EVBUFFER_LOCKED(buffer) \ + do { \ + assert((buffer)->lock_count > 0); \ + } while (0) +#define ASSERT_EVBUFFER_UNLOCKED(buffer) \ + do { \ + assert((buffer)->lock_count == 0); \ + } while (0) +#define _EVBUFFER_INCREMENT_LOCK_COUNT(buffer) \ + do { \ + ((struct evbuffer*)(buffer))->lock_count++; \ + } while (0) +#define _EVBUFFER_DECREMENT_LOCK_COUNT(buffer) \ + do { \ + ASSERT_EVBUFFER_LOCKED(buffer); \ + ((struct evbuffer*)(buffer))->lock_count--; \ + } while (0) + +#define EVBUFFER_LOCK(buffer, mode) \ + do { \ + EVLOCK_LOCK((buffer)->lock, (mode)); \ + _EVBUFFER_INCREMENT_LOCK_COUNT(buffer); \ + } while(0) +#define EVBUFFER_UNLOCK(buffer, mode) \ + do { \ + _EVBUFFER_DECREMENT_LOCK_COUNT(buffer); \ + EVLOCK_UNLOCK((buffer)->lock, (mode)); \ + } while(0) + +#define EVBUFFER_LOCK2(buffer1, buffer2) \ + do { \ + EVLOCK_LOCK2((buffer1)->lock, (buffer2)->lock, \ + EVTHREAD_WRITE, EVTHREAD_WRITE); \ + _EVBUFFER_INCREMENT_LOCK_COUNT(buffer1); \ + _EVBUFFER_INCREMENT_LOCK_COUNT(buffer2); \ + } while(0) +#define EVBUFFER_UNLOCK2(buffer1, buffer2) \ + do { \ + _EVBUFFER_DECREMENT_LOCK_COUNT(buffer1); \ + _EVBUFFER_DECREMENT_LOCK_COUNT(buffer2); \ + EVLOCK_UNLOCK2((buffer1)->lock, (buffer2)->lock, \ + EVTHREAD_WRITE, EVTHREAD_WRITE); \ + } while(0) + #ifdef __cplusplus } #endif diff --git a/include/event2/buffer.h b/include/event2/buffer.h index b10d1535..8e94acf1 100644 --- a/include/event2/buffer.h +++ b/include/event2/buffer.h @@ -101,6 +101,25 @@ struct evbuffer *evbuffer_new(void); */ void evbuffer_free(struct evbuffer *buf); +/** + Enable locking on an evbuffer so that it can safely be used by multiple + threads at the same time. + + NOTE: when locking is enabled, the lock will be held when callbacks are + invoked. This could result in deadlock if you aren't careful. Plan + accordingly! + + @param buf An evbuffer to make lockable. + @param lock A lock object, or NULL if we should allocate our own. + @return 0 on success, -1 on failure. + */ +int evbuffer_enable_locking(struct evbuffer *buf, void *lock); + +/* DOCDOC */ +void evbuffer_lock(struct evbuffer *buf); +void evbuffer_unlock(struct evbuffer *buf); + + /** Returns the total number of bytes stored in the event buffer @@ -421,7 +440,7 @@ struct evbuffer_cb_info { one: watch out! @param buffer the buffer whose size has changed - @param info a structure describing how the buffer changed + @param info a structure describing how the buffer changed. @param arg a pointer to user data */ typedef void (*evbuffer_cb_func)(struct evbuffer *buffer, const struct evbuffer_cb_info *info, void *arg);