support input/output filters for bufferevents

svn:r748
This commit is contained in:
Niels Provos 2008-04-30 00:09:16 +00:00
parent 28add6b9e2
commit 682adc443b
7 changed files with 631 additions and 41 deletions

View File

@ -79,6 +79,7 @@ Changes in current version:
o convert evhttp_connection to use bufferevents.
o use libevent's internal timercmp on all platforms, to avoid bugs on old platforms where timercmp(a,b,<=) is buggy.
o Remove the never-exported, never-used evhttp_hostportfile function.
o Support input/output filters for bufferevents; somewhat similar to libio's model. This will allow us to implement SSL, compression, etc, transparently to users of bufferevents such as the http layer.
Changes in 1.4.0:
o allow \r or \n individually to separate HTTP headers instead of the standard "\r\n"; from Charles Kerr.

View File

@ -81,7 +81,9 @@ event-config.h: config.h
-e 's/#ifndef /#ifndef _EVENT_/' < config.h >> $@
echo "#endif" >> $@
CORE_SRC = event.c buffer.c evbuffer.c log.c evutil.c $(SYS_SRC)
CORE_SRC = event.c buffer.c evbuffer-internal.h evbuffer.c \
bufferevent-internal.h \
log.c evutil.c $(SYS_SRC)
EXTRA_SRC = event_tagging.c http.c evhttp.h http-internal.h evdns.c \
evdns.h evrpc.c evrpc.h evrpc-internal.h mm-internal.h \
strlcpy.c strlcpy-internal.h strlcpy-internal.h

61
bufferevent-internal.h Normal file
View File

@ -0,0 +1,61 @@
/*
* Copyright (c) 2008 Niels Provos <provos@citi.umich.edu>
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions
* are met:
* 1. Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* 2. Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in the
* documentation and/or other materials provided with the distribution.
* 3. The name of the author may not be used to endorse or promote products
* derived from this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR
* IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
* OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
* IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT,
* INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
* NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF
* THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/
#ifndef _BUFFEREVENT_INTERNAL_H_
#define _BUFFEREVENT_INTERNAL_H_
#ifdef __cplusplus
extern "C" {
#endif
#include "config.h"
#include "evutil.h"
struct bufferevent_filter {
/** allows chaining of filters */
TAILQ_ENTRY(bufferevent_filter) (next);
/** used for intermediary state either on the input or output path */
struct evbuffer *buffer;
/** initializes the context provided to process */
void (*init_context)(void *);
/** frees any context related to ctx */
void (*free_context)(void *);
enum bufferevent_filter_result (*process)(
struct evbuffer *src, struct evbuffer *dst,
enum bufferevent_filter_state flags, void *ctx);
void *ctx;
};
#ifdef __cplusplus
}
#endif
#endif /* _BUFFEREVENT_INTERNAL_H_ */

View File

@ -34,6 +34,7 @@
#ifdef HAVE_SYS_TIME_H
#include <sys/time.h>
#endif
#include <sys/queue.h>
#include <errno.h>
#include <stdio.h>
@ -52,11 +53,17 @@
#include "event2/buffer.h"
#include "event2/bufferevent_struct.h"
#include "event2/event.h"
#include "log.h"
#include "mm-internal.h"
#include "bufferevent-internal.h"
/* prototypes */
void bufferevent_read_pressure_cb(struct evbuffer *, size_t, size_t, void *);
static void bufferevent_read_pressure_cb(
struct evbuffer *, size_t, size_t, void *);
static int bufferevent_process_filters(
struct bufferevent_filter *, struct evbuffer *,
enum bufferevent_filter_state);
static int
bufferevent_add(struct event *ev, int timeout)
@ -77,7 +84,7 @@ bufferevent_add(struct event *ev, int timeout)
* We use it to apply back pressure on the reading side.
*/
void
static void
bufferevent_read_pressure_cb(struct evbuffer *buf, size_t old, size_t now,
void *arg) {
struct bufferevent *bufev = arg;
@ -93,13 +100,47 @@ bufferevent_read_pressure_cb(struct evbuffer *buf, size_t old, size_t now,
}
}
static void
bufferevent_read_closure(struct bufferevent *bufev, int progress)
{
size_t len;
bufferevent_add(&bufev->ev_read, bufev->timeout_read);
/* nothing user visible changed? */
if (!progress)
return;
/* See if this callbacks meets the water marks */
len = EVBUFFER_LENGTH(bufev->input);
if (bufev->wm_read.low != 0 && len < bufev->wm_read.low)
return;
/* For read pressure, we use the buffer exposed to the users.
* Filters can arbitrarily change the data that users get to see,
* in particular, a user might select a watermark that is smaller
* then what a filter needs to make progress.
*/
if (bufev->wm_read.high != 0 && len >= bufev->wm_read.high) {
event_del(&bufev->ev_read);
/* Now schedule a callback for us when the buffer changes */
evbuffer_setcb(bufev->input,
bufferevent_read_pressure_cb, bufev);
}
/* Invoke the user callback - must always be called last */
if (bufev->readcb != NULL)
(*bufev->readcb)(bufev, bufev->cbarg);
}
static void
bufferevent_readcb(evutil_socket_t fd, short event, void *arg)
{
struct bufferevent *bufev = arg;
int res = 0;
struct evbuffer *input;
int res = 0, progress = 1;
short what = EVBUFFER_READ;
size_t len;
int howmuch = -1;
if (event == EV_TIMEOUT) {
@ -107,23 +148,28 @@ bufferevent_readcb(evutil_socket_t fd, short event, void *arg)
goto error;
}
if (TAILQ_FIRST(&bufev->input_filters) != NULL)
input = TAILQ_FIRST(&bufev->input_filters)->buffer;
else
input = bufev->input;
/*
* If we have a high watermark configured then we don't want to
* read more data than would make us reach the watermark.
*/
if (bufev->wm_read.high != 0) {
howmuch = bufev->wm_read.high - EVBUFFER_LENGTH(bufev->input);
howmuch = bufev->wm_read.high - EVBUFFER_LENGTH(input);
/* we might have lowered the watermark, stop reading */
if (howmuch <= 0) {
struct evbuffer *buf = bufev->input;
event_del(&bufev->ev_read);
evbuffer_setcb(buf,
evbuffer_setcb(input,
bufferevent_read_pressure_cb, bufev);
return;
}
}
res = evbuffer_read(bufev->input, fd, howmuch);
res = evbuffer_read(input, fd, howmuch);
if (res == -1) {
if (errno == EAGAIN || errno == EINTR)
goto reschedule;
@ -134,26 +180,27 @@ bufferevent_readcb(evutil_socket_t fd, short event, void *arg)
what |= EVBUFFER_EOF;
}
if (TAILQ_FIRST(&bufev->input_filters) != NULL) {
int state = BEV_NORMAL;
if (what & EVBUFFER_EOF)
state = BEV_FLUSH;
/* XXX(niels): what to do about EVBUFFER_ERROR? */
progress = bufferevent_process_filters(
TAILQ_FIRST(&bufev->input_filters),
bufev->input,
state);
/* propagate potential errors to the user */
if (progress == -1) {
res = -1;
what |= EVBUFFER_ERROR;
}
}
if (res <= 0)
goto error;
bufferevent_add(&bufev->ev_read, bufev->timeout_read);
/* See if this callbacks meets the water marks */
len = EVBUFFER_LENGTH(bufev->input);
if (bufev->wm_read.low != 0 && len < bufev->wm_read.low)
return;
if (bufev->wm_read.high != 0 && len >= bufev->wm_read.high) {
struct evbuffer *buf = bufev->input;
event_del(&bufev->ev_read);
/* Now schedule a callback for us when the buffer changes */
evbuffer_setcb(buf, bufferevent_read_pressure_cb, bufev);
}
/* Invoke the user callback - must always be called last */
if (bufev->readcb != NULL)
(*bufev->readcb)(bufev, bufev->cbarg);
bufferevent_read_closure(bufev, progress);
return;
reschedule:
@ -266,6 +313,9 @@ bufferevent_new(evutil_socket_t fd, evbuffercb readcb, evbuffercb writecb,
*/
bufev->enabled = EV_WRITE;
TAILQ_INIT(&bufev->input_filters);
TAILQ_INIT(&bufev->output_filters);
return (bufev);
}
@ -283,6 +333,8 @@ bufferevent_setcb(struct bufferevent *bufev,
void
bufferevent_setfd(struct bufferevent *bufev, evutil_socket_t fd)
{
struct bufferevent_filter *filter;
event_del(&bufev->ev_read);
event_del(&bufev->ev_write);
@ -293,6 +345,21 @@ bufferevent_setfd(struct bufferevent *bufev, evutil_socket_t fd)
event_base_set(bufev->ev_base, &bufev->ev_write);
}
/* we need to free all filter contexts and then init them again */
TAILQ_FOREACH(filter, &bufev->input_filters, next) {
if (filter->free_context)
filter->free_context(filter->ctx);
if (filter->init_context)
filter->init_context(filter->ctx);
}
TAILQ_FOREACH(filter, &bufev->output_filters, next) {
if (filter->free_context)
filter->free_context(filter->ctx);
if (filter->init_context)
filter->init_context(filter->ctx);
}
/* might have to manually trigger event registration */
}
@ -305,7 +372,9 @@ bufferevent_input(struct bufferevent *bufev)
struct evbuffer *
bufferevent_output(struct bufferevent *bufev)
{
return (bufev->output);
return TAILQ_FIRST(&bufev->output_filters) != NULL ?
TAILQ_FIRST(&bufev->output_filters)->buffer :
bufev->output;
}
int
@ -324,21 +393,55 @@ bufferevent_priority_set(struct bufferevent *bufev, int priority)
void
bufferevent_free(struct bufferevent *bufev)
{
struct bufferevent_filter *filter;
event_del(&bufev->ev_read);
event_del(&bufev->ev_write);
evbuffer_free(bufev->input);
evbuffer_free(bufev->output);
/* free input and output filters */
while ((filter = TAILQ_FIRST(&bufev->input_filters)) != NULL) {
bufferevent_filter_remove(bufev, BEV_INPUT, filter);
bufferevent_filter_free(filter);
}
while ((filter = TAILQ_FIRST(&bufev->output_filters)) != NULL) {
bufferevent_filter_remove(bufev, BEV_OUTPUT, filter);
bufferevent_filter_free(filter);
}
mm_free(bufev);
}
static inline void
/*
* Executes filters on the written data and schedules a network write if
* necessary.
*/
static inline int
bufferevent_write_closure(struct bufferevent *bufev, int progress)
{
/* if no data was written, we do not need to do anything */
if (!progress)
return (0);
if (TAILQ_FIRST(&bufev->output_filters) != NULL) {
progress = bufferevent_process_filters(
TAILQ_FIRST(&bufev->output_filters),
bufev->output, BEV_NORMAL);
if (progress == -1) {
(*bufev->errorcb)(bufev, EVBUFFER_ERROR, bufev->cbarg);
return (-1);
}
}
/* If everything is okay, we need to schedule a write */
if (progress && (bufev->enabled & EV_WRITE))
if (bufev->enabled & EV_WRITE)
bufferevent_add(&bufev->ev_write, bufev->timeout_write);
return (0);
}
/*
@ -349,25 +452,34 @@ bufferevent_write_closure(struct bufferevent *bufev, int progress)
int
bufferevent_write(struct bufferevent *bufev, const void *data, size_t size)
{
if (evbuffer_add(bufev->output, data, size) == -1)
struct evbuffer *output;
if (TAILQ_FIRST(&bufev->output_filters) != NULL)
output = TAILQ_FIRST(&bufev->output_filters)->buffer;
else
output = bufev->output;
if (evbuffer_add(output, data, size) == -1)
return (-1);
bufferevent_write_closure(bufev, size > 0);
return (0);
return (bufferevent_write_closure(bufev, size > 0));
}
int
bufferevent_write_buffer(struct bufferevent *bufev, struct evbuffer *buf)
{
int len = EVBUFFER_LENGTH(bufev->output);
int len = EVBUFFER_LENGTH(buf);
struct evbuffer *output;
if (evbuffer_add_buffer(bufev->output, buf) == -1)
if (TAILQ_FIRST(&bufev->output_filters) != NULL)
output = TAILQ_FIRST(&bufev->output_filters)->buffer;
else
output = bufev->output;
if (evbuffer_add_buffer(output, buf) == -1)
return (-1);
bufferevent_write_closure(bufev, len > 0);
return (0);
return (bufferevent_write_closure(bufev, len > 0));
}
size_t
@ -462,3 +574,156 @@ bufferevent_base_set(struct event_base *base, struct bufferevent *bufev)
res = event_base_set(base, &bufev->ev_write);
return (res);
}
/*
* Filtering stuff
*/
struct bufferevent_filter *
bufferevent_filter_new(
void (*init_context)(void *ctx),
void (*free_context)(void *ctx),
enum bufferevent_filter_result (*process)(
struct evbuffer *src, struct evbuffer *dst,
enum bufferevent_filter_state flags, void *ctx), void *ctx)
{
struct bufferevent_filter *filter;
if ((filter = mm_malloc(sizeof(struct bufferevent_filter))) == NULL)
return (NULL);
if ((filter->buffer = evbuffer_new()) == NULL) {
mm_free(filter);
return (NULL);
}
filter->init_context = init_context;
filter->free_context = free_context;
filter->process = process;
filter->ctx = ctx;
return (filter);
}
void
bufferevent_filter_free(struct bufferevent_filter *filter)
{
evbuffer_free(filter->buffer);
mm_free(filter);
}
void
bufferevent_filter_insert(struct bufferevent *bufev,
enum bufferevent_filter_type filter_type,
struct bufferevent_filter *filter)
{
switch (filter_type) {
case BEV_INPUT:
TAILQ_INSERT_TAIL(&bufev->input_filters, filter, next);
break;
case BEV_OUTPUT:
TAILQ_INSERT_HEAD(&bufev->output_filters, filter, next);
break;
default:
event_errx(1, "illegal filter type %d", filter_type);
}
if (filter->init_context)
filter->init_context(filter->ctx);
}
void
bufferevent_filter_remove(struct bufferevent *bufev,
enum bufferevent_filter_type filter_type,
struct bufferevent_filter *filter)
{
switch (filter_type) {
case BEV_INPUT:
TAILQ_REMOVE(&bufev->input_filters, filter, next);
break;
case BEV_OUTPUT:
TAILQ_REMOVE(&bufev->output_filters, filter, next);
break;
default:
event_errx(1, "illegal filter type %d", filter_type);
}
evbuffer_drain(filter->buffer, -1);
if (filter->free_context)
filter->free_context(filter->ctx);
}
static int
bufferevent_process_filters(
struct bufferevent_filter *filter, struct evbuffer *final,
enum bufferevent_filter_state state)
{
struct evbuffer *src, *dst;
struct bufferevent_filter *next;
int len = EVBUFFER_LENGTH(final);
for (; filter != NULL; filter = next) {
int res;
next = TAILQ_NEXT(filter, next);
src = filter->buffer;
dst = next != NULL ? next->buffer : final;
res = (*filter->process)(src, dst, state, filter->ctx);
/* an error causes complete termination of the bufferevent */
if (res == BEV_ERROR)
return (-1);
/* a read filter indicated that it cannot produce
* further data, we do not need to invoke any
* subsequent filters. Unless, a flush or something
* similar was specifically requested.
*/
if (res == BEV_NEED_MORE && state == BEV_NORMAL)
return (0);
}
/* we made user visible progress if the buffer size changed */
return (EVBUFFER_LENGTH(final) != len);
}
int
bufferevent_trigger_filter(struct bufferevent *bufev,
struct bufferevent_filter *filter, int iotype,
enum bufferevent_filter_state state)
{
struct evbuffer *dst = iotype == BEV_INPUT ?
bufev->input : bufev->output;
int progress;
/* trigger all filters if filter is not specified */
if (filter == NULL) {
struct bufferevent_filterq *head = BEV_INPUT ?
&bufev->input_filters : &bufev->output_filters;
filter = TAILQ_FIRST(head);
}
progress = bufferevent_process_filters(filter, dst, state);
if (progress == -1) {
(*bufev->errorcb)(bufev, EVBUFFER_ERROR, bufev->cbarg);
return (-1);
}
switch (iotype) {
case BEV_INPUT:
bufferevent_read_closure(bufev, progress);
break;
case BEV_OUTPUT:
if (progress && (bufev->enabled & EV_WRITE))
bufferevent_add(
&bufev->ev_write, bufev->timeout_write);
break;
default:
event_errx(1, "Illegal bufferevent iotype: %d", iotype);
}
return (0);
}

View File

@ -228,6 +228,9 @@ struct evbuffer *bufferevent_input(struct bufferevent *bufev);
/**
Returns the outut buffer.
When filters are being used, the filters need to be manually
triggered if the output buffer was manipulated.
@param bufev the buffervent from which to get the evbuffer
@return the evbuffer object for the output buffer
*/
@ -244,7 +247,6 @@ struct evbuffer *bufferevent_output(struct bufferevent *bufev);
*/
int bufferevent_enable(struct bufferevent *bufev, short event);
/**
Disable a bufferevent.
@ -288,6 +290,147 @@ void bufferevent_setwatermark(struct bufferevent *bufev, short events,
#define EVBUFFER_INPUT(x) bufferevent_input(x)
#define EVBUFFER_OUTPUT(x) bufferevent_output(x)
/**
Support for filtering input and output of bufferevents.
*/
/**
Flags that can be passed into filters to let them know how to
deal with the incoming data.
*/
enum bufferevent_filter_state {
/** usually set when processing data */
BEV_NORMAL = 0,
/** encountered EOF on read or done sending data */
BEV_FLUSH = 1,
};
/**
Values that filters can return.
*/
enum bufferevent_filter_result {
/** everything is okay */
BEV_OK = 0,
/** the filter needs to read more data before output */
BEV_NEED_MORE = 1,
/** the filter enountered a critical error, no further data
can be processed. */
BEV_ERROR = 2
};
struct bufferevent_filter;
/**
Creates a new filtering object for a bufferevent.
Filters can be used to implement compression, authentication, rate limiting,
etc. for bufferevents. Filters can be associated with the input or output
path or both. Filters need to be inserted with bufferevent_filter_insert()
on either the input or output path.
For example, when implementing compression, both an input and an
output filters are required. The output filter compress all output
as it passes along whereas the input filter decompresses all input as
it is being read from the network.
Some filters may require specificaly behavior such as flushing their buffers
on EOF. To allom them to do that, a bufferevent will invoke the filter
with BEV_FLUSH to let it know that EOF has been reached.
When a filter needs more data before it can output any data, it may return
BEV_NEED_MORE in which case the filter chain is being interrupted until
more data arrives. A filter can indicate a fatal error by returning
BEV_ERROR. Otherwise, it should return BEV_OK.
@param init_context an optional function that initializes the ctx parameter.
@param free_context an optional function to free memory associated with the
ctx parameter.
@param process the filtering function that should be invokved either during
input or output depending on where the filter should be attached.
@param ctx additional context that can be passed to the process function
@return a bufferevent_filter object that can subsequently be installed
*/
struct bufferevent_filter *bufferevent_filter_new(
void (*init_context)(void *),
void (*free_context)(void *),
enum bufferevent_filter_result (*process)(
struct evbuffer *src, struct evbuffer *dst,
enum bufferevent_filter_state state, void *ctx), void *ctx);
/**
Frees the filter object.
It must have been removed from the bufferevent before it can be freed.
@param filter the filter to be freed
@see bufferevent_filter_remove()
*/
void bufferevent_filter_free(struct bufferevent_filter *filter);
/** Filter types for inserting or removing filters */
enum bufferevent_filter_type {
/** filter is being used for input */
BEV_INPUT = 0,
/** filter is being used for output */
BEV_OUTPUT = 1
};
/**
Inserts a filter into the processing of data for bufferevent.
A filter can be inserted only once. It can not be used again for
another insert unless it have been removed via
bufferevent_filter_remove() first.
Input filters are inserted at the end, output filters at the
beginning of the queue.
@param bufev the bufferevent object into which to install the filter
@param filter_type either BEV_INPUT or BEV_OUTPUT
@param filter the filter object
@see bufferevent_filter_remove()
*/
void bufferevent_filter_insert(struct bufferevent *bufev,
enum bufferevent_filter_type filter_type,
struct bufferevent_filter *filter);
/**
Removes a filter from the bufferevent.
A filter should be flushed via buffervent_trigger_filter before removing
it from a bufferevent. Any remaining intermediate buffer data is going
to be lost.
@param bufev the bufferevent object from which to remove the filter
@param filter_type either BEV_INPUT or BEV_OUTPUT
@param filter the filter object or NULL to trigger all filters
@see bufferevent_trigger_filter()
*/
void bufferevent_filter_remove(struct bufferevent *bufev,
enum bufferevent_filter_type filter_type,
struct bufferevent_filter *filter);
/**
Triggers the filter chain the specified filter to produce more
data is possible. This is primarily for time-based filters such
as rate-limiting to produce more data as time passes.
@param bufev the bufferevent object to which the filter belongs
@param filter the bufferevent filter at which to start
@param iotype either BEV_INPUT or BEV_OUTPUT depending on where the filter
was installed
@param state either BEV_NORMAL or BEV_FLUSH
@return -1 on failure, 0 if no data was produces, 1 if data was produced
*/
int
bufferevent_trigger_filter(struct bufferevent *bufev,
struct bufferevent_filter *filter, int iotype,
enum bufferevent_filter_state state);
#ifdef __cplusplus
}

View File

@ -64,6 +64,25 @@ struct event_watermark {
size_t high;
};
struct bufferevent_filter;
/* Fix so that ppl dont have to run with <sys/queue.h> */
#ifndef TAILQ_HEAD
#define _EVENT_DEFINED_TQHEAD
#define TAILQ_HEAD(name, type) \
struct name { \
struct type *tqh_first; /* first element */ \
struct type **tqh_last; /* addr of last next element */ \
}
#endif /* !TAILQ_HEAD */
TAILQ_HEAD(bufferevent_filterq, bufferevent_filter);
#ifdef _EVENT_DEFINED_TQHEAD
#undef TAILQ_HEAD
#undef _EVENT_DEFINED_TQHEAD
#endif /* _EVENT_DEFINED_TQHEAD */
struct bufferevent {
struct event_base *ev_base;
@ -85,8 +104,11 @@ struct bufferevent {
int timeout_write; /* in seconds */
short enabled; /* events that are currently enabled */
};
/** the list of input and output filters */
struct bufferevent_filterq input_filters;
struct bufferevent_filterq output_filters;
};
#ifdef __cplusplus
}

View File

@ -1443,6 +1443,101 @@ test_bufferevent_watermarks(void)
cleanup_test();
}
/*
* Test bufferevent filters
*/
/* strip an 'x' from each byte */
static enum bufferevent_filter_result
bufferevent_input_filter(struct evbuffer *src, struct evbuffer *dst,
enum bufferevent_filter_state state, void *ctx)
{
const unsigned char *buffer;
int i;
if (state == BEV_FREE_DATA)
return (BEV_OK);
buffer = evbuffer_pullup(src, EVBUFFER_LENGTH(src));
for (i = 0; i < EVBUFFER_LENGTH(src); i += 2) {
assert(buffer[i] == 'x');
evbuffer_add(dst, buffer + i + 1, 1);
if (i + 2 > EVBUFFER_LENGTH(src))
break;
}
evbuffer_drain(src, i);
return (BEV_OK);
}
/* add an 'x' before each byte */
static enum bufferevent_filter_result
bufferevent_output_filter(struct evbuffer *src, struct evbuffer *dst,
enum bufferevent_filter_state state, void *ctx)
{
const unsigned char *buffer;
int i;
if (state == BEV_FREE_DATA)
return (BEV_OK);
buffer = evbuffer_pullup(src, EVBUFFER_LENGTH(src));
for (i = 0; i < EVBUFFER_LENGTH(src); ++i) {
evbuffer_add(dst, "x", 1);
evbuffer_add(dst, buffer + i, 1);
}
evbuffer_drain(src, EVBUFFER_LENGTH(src));
return (BEV_OK);
}
static void
test_bufferevent_filters(void)
{
struct bufferevent *bev1, *bev2;
struct bufferevent_filter *finput, *foutput;
char buffer[8333];
int i;
setup_test("Bufferevent Filters: ");
bev1 = bufferevent_new(pair[0], NULL, writecb, errorcb, NULL);
bev2 = bufferevent_new(pair[1], readcb, NULL, errorcb, NULL);
bufferevent_disable(bev1, EV_READ);
bufferevent_enable(bev2, EV_READ);
for (i = 0; i < sizeof(buffer); i++)
buffer[i] = i;
/* insert some filters */
finput = bufferevent_filter_new(
NULL, NULL,bufferevent_input_filter, NULL);
foutput = bufferevent_filter_new(
NULL, NULL, bufferevent_output_filter, NULL);
bufferevent_filter_insert(bev1, BEV_OUTPUT, foutput);
bufferevent_filter_insert(bev2, BEV_INPUT, finput);
bufferevent_write(bev1, buffer, sizeof(buffer));
event_dispatch();
bufferevent_filter_remove(bev1, BEV_OUTPUT, foutput);
bufferevent_filter_free(foutput);
bufferevent_free(bev1);
bufferevent_free(bev2);
if (test_ok != 2)
test_ok = 0;
cleanup_test();
}
struct test_pri_event {
struct event ev;
int count;
@ -1944,6 +2039,7 @@ main (int argc, char **argv)
test_bufferevent();
test_bufferevent_watermarks();
test_bufferevent_filters();
test_free_active_base();