From 682adc443b03a47c849f028474639c5d8254101c Mon Sep 17 00:00:00 2001 From: Niels Provos Date: Wed, 30 Apr 2008 00:09:16 +0000 Subject: [PATCH] support input/output filters for bufferevents svn:r748 --- ChangeLog | 1 + Makefile.am | 4 +- bufferevent-internal.h | 61 +++++ evbuffer.c | 341 ++++++++++++++++++++++++---- include/event2/bufferevent.h | 145 +++++++++++- include/event2/bufferevent_struct.h | 24 +- test/regress.c | 96 ++++++++ 7 files changed, 631 insertions(+), 41 deletions(-) create mode 100644 bufferevent-internal.h diff --git a/ChangeLog b/ChangeLog index 43925821..3a5fa85b 100644 --- a/ChangeLog +++ b/ChangeLog @@ -79,6 +79,7 @@ Changes in current version: o convert evhttp_connection to use bufferevents. o use libevent's internal timercmp on all platforms, to avoid bugs on old platforms where timercmp(a,b,<=) is buggy. o Remove the never-exported, never-used evhttp_hostportfile function. + o Support input/output filters for bufferevents; somewhat similar to libio's model. This will allow us to implement SSL, compression, etc, transparently to users of bufferevents such as the http layer. Changes in 1.4.0: o allow \r or \n individually to separate HTTP headers instead of the standard "\r\n"; from Charles Kerr. diff --git a/Makefile.am b/Makefile.am index 5fab46a0..22d96790 100644 --- a/Makefile.am +++ b/Makefile.am @@ -81,7 +81,9 @@ event-config.h: config.h -e 's/#ifndef /#ifndef _EVENT_/' < config.h >> $@ echo "#endif" >> $@ -CORE_SRC = event.c buffer.c evbuffer.c log.c evutil.c $(SYS_SRC) +CORE_SRC = event.c buffer.c evbuffer-internal.h evbuffer.c \ + bufferevent-internal.h \ + log.c evutil.c $(SYS_SRC) EXTRA_SRC = event_tagging.c http.c evhttp.h http-internal.h evdns.c \ evdns.h evrpc.c evrpc.h evrpc-internal.h mm-internal.h \ strlcpy.c strlcpy-internal.h strlcpy-internal.h diff --git a/bufferevent-internal.h b/bufferevent-internal.h new file mode 100644 index 00000000..b8803eaa --- /dev/null +++ b/bufferevent-internal.h @@ -0,0 +1,61 @@ +/* + * Copyright (c) 2008 Niels Provos + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions + * are met: + * 1. Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * 3. The name of the author may not be used to endorse or promote products + * derived from this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR + * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES + * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. + * IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, + * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT + * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF + * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ +#ifndef _BUFFEREVENT_INTERNAL_H_ +#define _BUFFEREVENT_INTERNAL_H_ + +#ifdef __cplusplus +extern "C" { +#endif + +#include "config.h" +#include "evutil.h" + +struct bufferevent_filter { + /** allows chaining of filters */ + TAILQ_ENTRY(bufferevent_filter) (next); + + /** used for intermediary state either on the input or output path */ + struct evbuffer *buffer; + + /** initializes the context provided to process */ + void (*init_context)(void *); + + /** frees any context related to ctx */ + void (*free_context)(void *); + + enum bufferevent_filter_result (*process)( + struct evbuffer *src, struct evbuffer *dst, + enum bufferevent_filter_state flags, void *ctx); + + void *ctx; +}; + +#ifdef __cplusplus +} +#endif + +#endif /* _BUFFEREVENT_INTERNAL_H_ */ diff --git a/evbuffer.c b/evbuffer.c index 9442da67..faf6cdb1 100644 --- a/evbuffer.c +++ b/evbuffer.c @@ -34,6 +34,7 @@ #ifdef HAVE_SYS_TIME_H #include #endif +#include #include #include @@ -52,11 +53,17 @@ #include "event2/buffer.h" #include "event2/bufferevent_struct.h" #include "event2/event.h" +#include "log.h" #include "mm-internal.h" +#include "bufferevent-internal.h" /* prototypes */ -void bufferevent_read_pressure_cb(struct evbuffer *, size_t, size_t, void *); +static void bufferevent_read_pressure_cb( + struct evbuffer *, size_t, size_t, void *); +static int bufferevent_process_filters( + struct bufferevent_filter *, struct evbuffer *, + enum bufferevent_filter_state); static int bufferevent_add(struct event *ev, int timeout) @@ -77,7 +84,7 @@ bufferevent_add(struct event *ev, int timeout) * We use it to apply back pressure on the reading side. */ -void +static void bufferevent_read_pressure_cb(struct evbuffer *buf, size_t old, size_t now, void *arg) { struct bufferevent *bufev = arg; @@ -93,13 +100,47 @@ bufferevent_read_pressure_cb(struct evbuffer *buf, size_t old, size_t now, } } +static void +bufferevent_read_closure(struct bufferevent *bufev, int progress) +{ + size_t len; + + bufferevent_add(&bufev->ev_read, bufev->timeout_read); + + /* nothing user visible changed? */ + if (!progress) + return; + + /* See if this callbacks meets the water marks */ + len = EVBUFFER_LENGTH(bufev->input); + if (bufev->wm_read.low != 0 && len < bufev->wm_read.low) + return; + + /* For read pressure, we use the buffer exposed to the users. + * Filters can arbitrarily change the data that users get to see, + * in particular, a user might select a watermark that is smaller + * then what a filter needs to make progress. + */ + if (bufev->wm_read.high != 0 && len >= bufev->wm_read.high) { + event_del(&bufev->ev_read); + + /* Now schedule a callback for us when the buffer changes */ + evbuffer_setcb(bufev->input, + bufferevent_read_pressure_cb, bufev); + } + + /* Invoke the user callback - must always be called last */ + if (bufev->readcb != NULL) + (*bufev->readcb)(bufev, bufev->cbarg); +} + static void bufferevent_readcb(evutil_socket_t fd, short event, void *arg) { struct bufferevent *bufev = arg; - int res = 0; + struct evbuffer *input; + int res = 0, progress = 1; short what = EVBUFFER_READ; - size_t len; int howmuch = -1; if (event == EV_TIMEOUT) { @@ -107,23 +148,28 @@ bufferevent_readcb(evutil_socket_t fd, short event, void *arg) goto error; } + if (TAILQ_FIRST(&bufev->input_filters) != NULL) + input = TAILQ_FIRST(&bufev->input_filters)->buffer; + else + input = bufev->input; + /* * If we have a high watermark configured then we don't want to * read more data than would make us reach the watermark. */ if (bufev->wm_read.high != 0) { - howmuch = bufev->wm_read.high - EVBUFFER_LENGTH(bufev->input); + howmuch = bufev->wm_read.high - EVBUFFER_LENGTH(input); /* we might have lowered the watermark, stop reading */ if (howmuch <= 0) { - struct evbuffer *buf = bufev->input; event_del(&bufev->ev_read); - evbuffer_setcb(buf, + evbuffer_setcb(input, bufferevent_read_pressure_cb, bufev); return; } } - res = evbuffer_read(bufev->input, fd, howmuch); + res = evbuffer_read(input, fd, howmuch); + if (res == -1) { if (errno == EAGAIN || errno == EINTR) goto reschedule; @@ -134,26 +180,27 @@ bufferevent_readcb(evutil_socket_t fd, short event, void *arg) what |= EVBUFFER_EOF; } + if (TAILQ_FIRST(&bufev->input_filters) != NULL) { + int state = BEV_NORMAL; + if (what & EVBUFFER_EOF) + state = BEV_FLUSH; + /* XXX(niels): what to do about EVBUFFER_ERROR? */ + progress = bufferevent_process_filters( + TAILQ_FIRST(&bufev->input_filters), + bufev->input, + state); + + /* propagate potential errors to the user */ + if (progress == -1) { + res = -1; + what |= EVBUFFER_ERROR; + } + } + if (res <= 0) goto error; - bufferevent_add(&bufev->ev_read, bufev->timeout_read); - - /* See if this callbacks meets the water marks */ - len = EVBUFFER_LENGTH(bufev->input); - if (bufev->wm_read.low != 0 && len < bufev->wm_read.low) - return; - if (bufev->wm_read.high != 0 && len >= bufev->wm_read.high) { - struct evbuffer *buf = bufev->input; - event_del(&bufev->ev_read); - - /* Now schedule a callback for us when the buffer changes */ - evbuffer_setcb(buf, bufferevent_read_pressure_cb, bufev); - } - - /* Invoke the user callback - must always be called last */ - if (bufev->readcb != NULL) - (*bufev->readcb)(bufev, bufev->cbarg); + bufferevent_read_closure(bufev, progress); return; reschedule: @@ -266,6 +313,9 @@ bufferevent_new(evutil_socket_t fd, evbuffercb readcb, evbuffercb writecb, */ bufev->enabled = EV_WRITE; + TAILQ_INIT(&bufev->input_filters); + TAILQ_INIT(&bufev->output_filters); + return (bufev); } @@ -283,6 +333,8 @@ bufferevent_setcb(struct bufferevent *bufev, void bufferevent_setfd(struct bufferevent *bufev, evutil_socket_t fd) { + struct bufferevent_filter *filter; + event_del(&bufev->ev_read); event_del(&bufev->ev_write); @@ -293,6 +345,21 @@ bufferevent_setfd(struct bufferevent *bufev, evutil_socket_t fd) event_base_set(bufev->ev_base, &bufev->ev_write); } + /* we need to free all filter contexts and then init them again */ + TAILQ_FOREACH(filter, &bufev->input_filters, next) { + if (filter->free_context) + filter->free_context(filter->ctx); + if (filter->init_context) + filter->init_context(filter->ctx); + } + + TAILQ_FOREACH(filter, &bufev->output_filters, next) { + if (filter->free_context) + filter->free_context(filter->ctx); + if (filter->init_context) + filter->init_context(filter->ctx); + } + /* might have to manually trigger event registration */ } @@ -305,7 +372,9 @@ bufferevent_input(struct bufferevent *bufev) struct evbuffer * bufferevent_output(struct bufferevent *bufev) { - return (bufev->output); + return TAILQ_FIRST(&bufev->output_filters) != NULL ? + TAILQ_FIRST(&bufev->output_filters)->buffer : + bufev->output; } int @@ -324,21 +393,55 @@ bufferevent_priority_set(struct bufferevent *bufev, int priority) void bufferevent_free(struct bufferevent *bufev) { + struct bufferevent_filter *filter; + event_del(&bufev->ev_read); event_del(&bufev->ev_write); evbuffer_free(bufev->input); evbuffer_free(bufev->output); + /* free input and output filters */ + while ((filter = TAILQ_FIRST(&bufev->input_filters)) != NULL) { + bufferevent_filter_remove(bufev, BEV_INPUT, filter); + + bufferevent_filter_free(filter); + } + + while ((filter = TAILQ_FIRST(&bufev->output_filters)) != NULL) { + bufferevent_filter_remove(bufev, BEV_OUTPUT, filter); + + bufferevent_filter_free(filter); + } + mm_free(bufev); } - -static inline void +/* + * Executes filters on the written data and schedules a network write if + * necessary. + */ +static inline int bufferevent_write_closure(struct bufferevent *bufev, int progress) { + /* if no data was written, we do not need to do anything */ + if (!progress) + return (0); + + if (TAILQ_FIRST(&bufev->output_filters) != NULL) { + progress = bufferevent_process_filters( + TAILQ_FIRST(&bufev->output_filters), + bufev->output, BEV_NORMAL); + if (progress == -1) { + (*bufev->errorcb)(bufev, EVBUFFER_ERROR, bufev->cbarg); + return (-1); + } + } + /* If everything is okay, we need to schedule a write */ - if (progress && (bufev->enabled & EV_WRITE)) + if (bufev->enabled & EV_WRITE) bufferevent_add(&bufev->ev_write, bufev->timeout_write); + + return (0); } /* @@ -349,25 +452,34 @@ bufferevent_write_closure(struct bufferevent *bufev, int progress) int bufferevent_write(struct bufferevent *bufev, const void *data, size_t size) { - if (evbuffer_add(bufev->output, data, size) == -1) + struct evbuffer *output; + + if (TAILQ_FIRST(&bufev->output_filters) != NULL) + output = TAILQ_FIRST(&bufev->output_filters)->buffer; + else + output = bufev->output; + + if (evbuffer_add(output, data, size) == -1) return (-1); - bufferevent_write_closure(bufev, size > 0); - - return (0); + return (bufferevent_write_closure(bufev, size > 0)); } int bufferevent_write_buffer(struct bufferevent *bufev, struct evbuffer *buf) { - int len = EVBUFFER_LENGTH(bufev->output); + int len = EVBUFFER_LENGTH(buf); + struct evbuffer *output; - if (evbuffer_add_buffer(bufev->output, buf) == -1) + if (TAILQ_FIRST(&bufev->output_filters) != NULL) + output = TAILQ_FIRST(&bufev->output_filters)->buffer; + else + output = bufev->output; + + if (evbuffer_add_buffer(output, buf) == -1) return (-1); - - bufferevent_write_closure(bufev, len > 0); - return (0); + return (bufferevent_write_closure(bufev, len > 0)); } size_t @@ -462,3 +574,156 @@ bufferevent_base_set(struct event_base *base, struct bufferevent *bufev) res = event_base_set(base, &bufev->ev_write); return (res); } + +/* + * Filtering stuff + */ + +struct bufferevent_filter * +bufferevent_filter_new( + void (*init_context)(void *ctx), + void (*free_context)(void *ctx), + enum bufferevent_filter_result (*process)( + struct evbuffer *src, struct evbuffer *dst, + enum bufferevent_filter_state flags, void *ctx), void *ctx) +{ + struct bufferevent_filter *filter; + + if ((filter = mm_malloc(sizeof(struct bufferevent_filter))) == NULL) + return (NULL); + + if ((filter->buffer = evbuffer_new()) == NULL) { + mm_free(filter); + return (NULL); + } + + filter->init_context = init_context; + filter->free_context = free_context; + filter->process = process; + filter->ctx = ctx; + + return (filter); +} + +void +bufferevent_filter_free(struct bufferevent_filter *filter) +{ + evbuffer_free(filter->buffer); + mm_free(filter); +} + +void +bufferevent_filter_insert(struct bufferevent *bufev, + enum bufferevent_filter_type filter_type, + struct bufferevent_filter *filter) +{ + switch (filter_type) { + case BEV_INPUT: + TAILQ_INSERT_TAIL(&bufev->input_filters, filter, next); + break; + case BEV_OUTPUT: + TAILQ_INSERT_HEAD(&bufev->output_filters, filter, next); + break; + default: + event_errx(1, "illegal filter type %d", filter_type); + } + + if (filter->init_context) + filter->init_context(filter->ctx); +} + +void +bufferevent_filter_remove(struct bufferevent *bufev, + enum bufferevent_filter_type filter_type, + struct bufferevent_filter *filter) +{ + switch (filter_type) { + case BEV_INPUT: + TAILQ_REMOVE(&bufev->input_filters, filter, next); + break; + case BEV_OUTPUT: + TAILQ_REMOVE(&bufev->output_filters, filter, next); + break; + default: + event_errx(1, "illegal filter type %d", filter_type); + } + + evbuffer_drain(filter->buffer, -1); + + if (filter->free_context) + filter->free_context(filter->ctx); + +} + +static int +bufferevent_process_filters( + struct bufferevent_filter *filter, struct evbuffer *final, + enum bufferevent_filter_state state) +{ + struct evbuffer *src, *dst; + struct bufferevent_filter *next; + int len = EVBUFFER_LENGTH(final); + + for (; filter != NULL; filter = next) { + int res; + + next = TAILQ_NEXT(filter, next); + src = filter->buffer; + dst = next != NULL ? next->buffer : final; + + res = (*filter->process)(src, dst, state, filter->ctx); + + /* an error causes complete termination of the bufferevent */ + if (res == BEV_ERROR) + return (-1); + + /* a read filter indicated that it cannot produce + * further data, we do not need to invoke any + * subsequent filters. Unless, a flush or something + * similar was specifically requested. + */ + if (res == BEV_NEED_MORE && state == BEV_NORMAL) + return (0); + } + + /* we made user visible progress if the buffer size changed */ + return (EVBUFFER_LENGTH(final) != len); +} + +int +bufferevent_trigger_filter(struct bufferevent *bufev, + struct bufferevent_filter *filter, int iotype, + enum bufferevent_filter_state state) +{ + struct evbuffer *dst = iotype == BEV_INPUT ? + bufev->input : bufev->output; + int progress; + + /* trigger all filters if filter is not specified */ + if (filter == NULL) { + struct bufferevent_filterq *head = BEV_INPUT ? + &bufev->input_filters : &bufev->output_filters; + filter = TAILQ_FIRST(head); + } + + progress = bufferevent_process_filters(filter, dst, state); + if (progress == -1) { + (*bufev->errorcb)(bufev, EVBUFFER_ERROR, bufev->cbarg); + return (-1); + } + + switch (iotype) { + case BEV_INPUT: + bufferevent_read_closure(bufev, progress); + break; + case BEV_OUTPUT: + if (progress && (bufev->enabled & EV_WRITE)) + bufferevent_add( + &bufev->ev_write, bufev->timeout_write); + break; + default: + event_errx(1, "Illegal bufferevent iotype: %d", iotype); + } + + return (0); +} diff --git a/include/event2/bufferevent.h b/include/event2/bufferevent.h index 5ef4e8bc..3a630805 100644 --- a/include/event2/bufferevent.h +++ b/include/event2/bufferevent.h @@ -228,6 +228,9 @@ struct evbuffer *bufferevent_input(struct bufferevent *bufev); /** Returns the outut buffer. + When filters are being used, the filters need to be manually + triggered if the output buffer was manipulated. + @param bufev the buffervent from which to get the evbuffer @return the evbuffer object for the output buffer */ @@ -244,7 +247,6 @@ struct evbuffer *bufferevent_output(struct bufferevent *bufev); */ int bufferevent_enable(struct bufferevent *bufev, short event); - /** Disable a bufferevent. @@ -288,6 +290,147 @@ void bufferevent_setwatermark(struct bufferevent *bufev, short events, #define EVBUFFER_INPUT(x) bufferevent_input(x) #define EVBUFFER_OUTPUT(x) bufferevent_output(x) +/** + Support for filtering input and output of bufferevents. + */ + +/** + Flags that can be passed into filters to let them know how to + deal with the incoming data. +*/ +enum bufferevent_filter_state { + /** usually set when processing data */ + BEV_NORMAL = 0, + + /** encountered EOF on read or done sending data */ + BEV_FLUSH = 1, +}; + +/** + Values that filters can return. + */ +enum bufferevent_filter_result { + /** everything is okay */ + BEV_OK = 0, + + /** the filter needs to read more data before output */ + BEV_NEED_MORE = 1, + + /** the filter enountered a critical error, no further data + can be processed. */ + BEV_ERROR = 2 +}; + +struct bufferevent_filter; + +/** + Creates a new filtering object for a bufferevent. + + Filters can be used to implement compression, authentication, rate limiting, + etc. for bufferevents. Filters can be associated with the input or output + path or both. Filters need to be inserted with bufferevent_filter_insert() + on either the input or output path. + + For example, when implementing compression, both an input and an + output filters are required. The output filter compress all output + as it passes along whereas the input filter decompresses all input as + it is being read from the network. + + Some filters may require specificaly behavior such as flushing their buffers + on EOF. To allom them to do that, a bufferevent will invoke the filter + with BEV_FLUSH to let it know that EOF has been reached. + + When a filter needs more data before it can output any data, it may return + BEV_NEED_MORE in which case the filter chain is being interrupted until + more data arrives. A filter can indicate a fatal error by returning + BEV_ERROR. Otherwise, it should return BEV_OK. + + @param init_context an optional function that initializes the ctx parameter. + @param free_context an optional function to free memory associated with the + ctx parameter. + @param process the filtering function that should be invokved either during + input or output depending on where the filter should be attached. + @param ctx additional context that can be passed to the process function + @return a bufferevent_filter object that can subsequently be installed +*/ +struct bufferevent_filter *bufferevent_filter_new( + void (*init_context)(void *), + void (*free_context)(void *), + enum bufferevent_filter_result (*process)( + struct evbuffer *src, struct evbuffer *dst, + enum bufferevent_filter_state state, void *ctx), void *ctx); + +/** + Frees the filter object. + + It must have been removed from the bufferevent before it can be freed. + + @param filter the filter to be freed + @see bufferevent_filter_remove() +*/ +void bufferevent_filter_free(struct bufferevent_filter *filter); + +/** Filter types for inserting or removing filters */ +enum bufferevent_filter_type { + /** filter is being used for input */ + BEV_INPUT = 0, + + /** filter is being used for output */ + BEV_OUTPUT = 1 +}; + +/** + Inserts a filter into the processing of data for bufferevent. + + A filter can be inserted only once. It can not be used again for + another insert unless it have been removed via + bufferevent_filter_remove() first. + + Input filters are inserted at the end, output filters at the + beginning of the queue. + + @param bufev the bufferevent object into which to install the filter + @param filter_type either BEV_INPUT or BEV_OUTPUT + @param filter the filter object + @see bufferevent_filter_remove() + */ +void bufferevent_filter_insert(struct bufferevent *bufev, + enum bufferevent_filter_type filter_type, + struct bufferevent_filter *filter); + +/** + Removes a filter from the bufferevent. + + A filter should be flushed via buffervent_trigger_filter before removing + it from a bufferevent. Any remaining intermediate buffer data is going + to be lost. + + @param bufev the bufferevent object from which to remove the filter + @param filter_type either BEV_INPUT or BEV_OUTPUT + @param filter the filter object or NULL to trigger all filters + @see bufferevent_trigger_filter() +*/ +void bufferevent_filter_remove(struct bufferevent *bufev, + enum bufferevent_filter_type filter_type, + struct bufferevent_filter *filter); + +/** + Triggers the filter chain the specified filter to produce more + data is possible. This is primarily for time-based filters such + as rate-limiting to produce more data as time passes. + + @param bufev the bufferevent object to which the filter belongs + @param filter the bufferevent filter at which to start + @param iotype either BEV_INPUT or BEV_OUTPUT depending on where the filter + was installed + @param state either BEV_NORMAL or BEV_FLUSH + @return -1 on failure, 0 if no data was produces, 1 if data was produced + */ + +int +bufferevent_trigger_filter(struct bufferevent *bufev, + struct bufferevent_filter *filter, int iotype, + enum bufferevent_filter_state state); #ifdef __cplusplus } diff --git a/include/event2/bufferevent_struct.h b/include/event2/bufferevent_struct.h index f35ba2a2..8616a9a6 100644 --- a/include/event2/bufferevent_struct.h +++ b/include/event2/bufferevent_struct.h @@ -64,6 +64,25 @@ struct event_watermark { size_t high; }; +struct bufferevent_filter; + +/* Fix so that ppl dont have to run with */ +#ifndef TAILQ_HEAD +#define _EVENT_DEFINED_TQHEAD +#define TAILQ_HEAD(name, type) \ +struct name { \ + struct type *tqh_first; /* first element */ \ + struct type **tqh_last; /* addr of last next element */ \ +} +#endif /* !TAILQ_HEAD */ + +TAILQ_HEAD(bufferevent_filterq, bufferevent_filter); + +#ifdef _EVENT_DEFINED_TQHEAD +#undef TAILQ_HEAD +#undef _EVENT_DEFINED_TQHEAD +#endif /* _EVENT_DEFINED_TQHEAD */ + struct bufferevent { struct event_base *ev_base; @@ -85,8 +104,11 @@ struct bufferevent { int timeout_write; /* in seconds */ short enabled; /* events that are currently enabled */ -}; + /** the list of input and output filters */ + struct bufferevent_filterq input_filters; + struct bufferevent_filterq output_filters; +}; #ifdef __cplusplus } diff --git a/test/regress.c b/test/regress.c index 55d68609..e73d5dd2 100644 --- a/test/regress.c +++ b/test/regress.c @@ -1443,6 +1443,101 @@ test_bufferevent_watermarks(void) cleanup_test(); } +/* + * Test bufferevent filters + */ + +/* strip an 'x' from each byte */ + +static enum bufferevent_filter_result +bufferevent_input_filter(struct evbuffer *src, struct evbuffer *dst, + enum bufferevent_filter_state state, void *ctx) +{ + const unsigned char *buffer; + int i; + + if (state == BEV_FREE_DATA) + return (BEV_OK); + + buffer = evbuffer_pullup(src, EVBUFFER_LENGTH(src)); + for (i = 0; i < EVBUFFER_LENGTH(src); i += 2) { + assert(buffer[i] == 'x'); + evbuffer_add(dst, buffer + i + 1, 1); + + if (i + 2 > EVBUFFER_LENGTH(src)) + break; + } + + evbuffer_drain(src, i); + return (BEV_OK); +} + +/* add an 'x' before each byte */ + +static enum bufferevent_filter_result +bufferevent_output_filter(struct evbuffer *src, struct evbuffer *dst, + enum bufferevent_filter_state state, void *ctx) +{ + const unsigned char *buffer; + int i; + + if (state == BEV_FREE_DATA) + return (BEV_OK); + + buffer = evbuffer_pullup(src, EVBUFFER_LENGTH(src)); + for (i = 0; i < EVBUFFER_LENGTH(src); ++i) { + evbuffer_add(dst, "x", 1); + evbuffer_add(dst, buffer + i, 1); + } + + evbuffer_drain(src, EVBUFFER_LENGTH(src)); + return (BEV_OK); +} + +static void +test_bufferevent_filters(void) +{ + struct bufferevent *bev1, *bev2; + struct bufferevent_filter *finput, *foutput; + char buffer[8333]; + int i; + + setup_test("Bufferevent Filters: "); + + bev1 = bufferevent_new(pair[0], NULL, writecb, errorcb, NULL); + bev2 = bufferevent_new(pair[1], readcb, NULL, errorcb, NULL); + + bufferevent_disable(bev1, EV_READ); + bufferevent_enable(bev2, EV_READ); + + for (i = 0; i < sizeof(buffer); i++) + buffer[i] = i; + + /* insert some filters */ + finput = bufferevent_filter_new( + NULL, NULL,bufferevent_input_filter, NULL); + foutput = bufferevent_filter_new( + NULL, NULL, bufferevent_output_filter, NULL); + + bufferevent_filter_insert(bev1, BEV_OUTPUT, foutput); + bufferevent_filter_insert(bev2, BEV_INPUT, finput); + + bufferevent_write(bev1, buffer, sizeof(buffer)); + + event_dispatch(); + + bufferevent_filter_remove(bev1, BEV_OUTPUT, foutput); + bufferevent_filter_free(foutput); + + bufferevent_free(bev1); + bufferevent_free(bev2); + + if (test_ok != 2) + test_ok = 0; + + cleanup_test(); +} + struct test_pri_event { struct event ev; int count; @@ -1944,6 +2039,7 @@ main (int argc, char **argv) test_bufferevent(); test_bufferevent_watermarks(); + test_bufferevent_filters(); test_free_active_base();