simple hooks for processing incoming and outgoing rpcs

svn:r466
This commit is contained in:
Niels Provos 2007-11-02 06:34:04 +00:00
parent 18ac92486f
commit 65236aa857
5 changed files with 192 additions and 3 deletions

View File

@ -29,3 +29,4 @@ Changes in current version:
o Fix implementation of strsep on platforms that lack it
o Fix implementation of getaddrinfo on platforms that lack it; mainly, this will make Windows http.c work better. Original patch by Lubomir Marinov.
o Fix evport implementation: port_disassociate called on unassociated events resulting in bogus errors; more efficient memory management; from Trond Norbye and Prakash Sangappa
o support for hooks on rpc input and output; can be used to implement rpc independent processing such as compression or authentication.

View File

@ -33,12 +33,24 @@ struct evrpc;
#define EVRPC_URI_PREFIX "/.rpc."
struct evrpc_hook {
TAILQ_ENTRY(evrpc_hook) (next);
/* returns -1; if the rpc should be aborted, is allowed to rewrite */
int (*process)(struct evhttp_request *, struct evbuffer *, void *);
void *process_arg;
};
struct evrpc_base {
/* the HTTP server under which we register our RPC calls */
struct evhttp* http_server;
/* a list of all RPCs registered with us */
TAILQ_HEAD(evrpc_list, evrpc) registered_rpcs;
/* hooks for processing outbound and inbound rpcs */
TAILQ_HEAD(evrpc_hook_list, evrpc_hook) input_hooks;
struct evrpc_hook_list output_hooks;
};
struct evrpc_req_generic;

108
evrpc.c
View File

@ -74,6 +74,8 @@ evrpc_init(struct evhttp *http_server)
evtag_init();
TAILQ_INIT(&base->registered_rpcs);
TAILQ_INIT(&base->input_hooks);
TAILQ_INIT(&base->output_hooks);
base->http_server = http_server;
return (base);
@ -83,14 +85,95 @@ void
evrpc_free(struct evrpc_base *base)
{
struct evrpc *rpc;
struct evrpc_hook *hook;
while ((rpc = TAILQ_FIRST(&base->registered_rpcs)) != NULL) {
assert(evrpc_unregister_rpc(base, rpc->uri));
}
while ((hook = TAILQ_FIRST(&base->input_hooks)) != NULL) {
assert(evrpc_remove_hook(base, INPUT, hook));
}
while ((hook = TAILQ_FIRST(&base->output_hooks)) != NULL) {
assert(evrpc_remove_hook(base, OUTPUT, hook));
}
free(base);
}
void *
evrpc_add_hook(struct evrpc_base *base,
enum EVRPC_HOOK_TYPE hook_type,
int (*cb)(struct evhttp_request *, struct evbuffer *, void *),
void *cb_arg)
{
struct evrpc_hook_list *head = NULL;
struct evrpc_hook *hook = NULL;
switch (hook_type) {
case INPUT:
head = &base->input_hooks;
break;
case OUTPUT:
head = &base->output_hooks;
break;
default:
assert(hook_type == INPUT || hook_type == OUTPUT);
}
hook = calloc(1, sizeof(struct evrpc_hook));
assert(hook != NULL);
hook->process = cb;
hook->process_arg = cb_arg;
TAILQ_INSERT_TAIL(head, hook, next);
return (hook);
}
/*
* remove the hook specified by the handle
*/
int
evrpc_remove_hook(struct evrpc_base *base,
enum EVRPC_HOOK_TYPE hook_type,
void *handle)
{
struct evrpc_hook_list *head = NULL;
struct evrpc_hook *hook = NULL;
switch (hook_type) {
case INPUT:
head = &base->input_hooks;
break;
case OUTPUT:
head = &base->output_hooks;
break;
default:
assert(hook_type == INPUT || hook_type == OUTPUT);
}
TAILQ_FOREACH(hook, head, next) {
if (hook == handle) {
TAILQ_REMOVE(head, hook, next);
free(hook);
return (1);
}
}
return (0);
}
static int
evrpc_process_hooks(struct evrpc_hook_list *head,
struct evhttp_request *req, struct evbuffer *evbuf)
{
struct evrpc_hook *hook;
TAILQ_FOREACH(hook, head, next) {
if (hook->process(req, evbuf, hook->process_arg) == -1)
return (-1);
}
return (0);
}
static void evrpc_pool_schedule(struct evrpc_pool *pool);
static void evrpc_request_cb(struct evhttp_request *, void *);
void evrpc_request_done(struct evrpc_req_generic*);
@ -124,6 +207,7 @@ evrpc_register_rpc(struct evrpc_base *base, struct evrpc *rpc,
{
char *constructed_uri = evrpc_construct_uri(rpc->uri);
rpc->base = base;
rpc->cb = cb;
rpc->cb_arg = cb_arg;
@ -179,6 +263,15 @@ evrpc_request_cb(struct evhttp_request *req, void *arg)
EVBUFFER_LENGTH(req->input_buffer) <= 0)
goto error;
/*
* we might want to allow hooks to suspend the processing,
* but at the moment, we assume that they just act as simple
* filters.
*/
if (evrpc_process_hooks(&rpc->base->input_hooks,
req, req->input_buffer) == -1)
goto error;
rpc_state = calloc(1, sizeof(struct evrpc_req_generic));
if (rpc_state == NULL)
goto error;
@ -236,7 +329,7 @@ evrpc_request_done(struct evrpc_req_generic* rpc_state)
{
struct evhttp_request *req = rpc_state->http_req;
struct evrpc *rpc = rpc_state->rpc;
struct evbuffer* data;
struct evbuffer* data = NULL;
if (rpc->reply_complete(rpc_state->reply) == -1) {
/* the reply was not completely filled in. error out */
@ -251,6 +344,11 @@ evrpc_request_done(struct evrpc_req_generic* rpc_state)
/* serialize the reply */
rpc->reply_marshal(data, rpc_state->reply);
/* do hook based tweaks to the request */
if (evrpc_process_hooks(&rpc->base->output_hooks,
req, data) == -1)
goto error;
evhttp_send_reply(req, HTTP_OK, "OK", data);
evbuffer_free(data);
@ -260,6 +358,8 @@ evrpc_request_done(struct evrpc_req_generic* rpc_state)
return;
error:
if (data != NULL)
evbuffer_free(data);
evrpc_reqstate_free(rpc_state);
evhttp_send_error(req, HTTP_SERVUNAVAIL, "Service Error");
return;
@ -460,6 +560,8 @@ evrpc_reply_done(struct evhttp_request *req, void *arg)
event_del(&ctx->ev_timeout);
memset(&status, 0, sizeof(status));
status.http_req = req;
/* we need to get the reply now */
if (req != NULL) {
res = ctx->reply_unmarshal(ctx->reply, req->input_buffer);

34
evrpc.h
View File

@ -99,6 +99,9 @@ struct evrpc {
/* the callback invoked for each received rpc */
void (*cb)(struct evrpc_req_generic *, void *);
void *cb_arg;
/* reference for further configuration */
struct evrpc_base *base;
};
#define EVRPC_STRUCT(rpcname) struct evrpc_req__##rpcname
@ -140,6 +143,7 @@ EVRPC_STRUCT(rpcname) { \
struct reqstruct* request; \
struct rplystruct* reply; \
struct evrpc* rpc; \
struct evhttp_request* http_req; \
void (*done)(struct evrpc_status *, \
struct evrpc* rpc, void *request, void *reply); \
}; \
@ -184,6 +188,11 @@ error: \
return (-1); \
}
/*
* Access to the underlying http object; can be used to look at headers or
* for getting the remote ip address
*/
#define EVRPC_REQUEST_HTTP(rpc_req) (rpc_req)->http_req
/*
* EVRPC_REQUEST_DONE is used to answer a request; the reply is expected
@ -252,6 +261,9 @@ struct evrpc_status {
#define EVRPC_STATUS_ERR_BADPAYLOAD 2
#define EVRPC_STATUS_ERR_UNSTARTED 3
int error;
/* for looking at headers or other information */
struct evhttp_request *http_req;
};
struct evrpc_request_wrapper {
@ -313,6 +325,28 @@ void evrpc_pool_add_connection(struct evrpc_pool *,
*/
void evrpc_pool_set_timeout(struct evrpc_pool *, int timeout_in_secs);
/*
* Hooks for changing the input and output of RPCs; this can be used to
* implement compression, authentication, encryption, ...
*
* If a hook returns -1, the processing is aborted.
*
* The add functions return handles that can be used for removing hooks.
*/
enum EVRPC_HOOK_TYPE {
INPUT, OUTPUT
};
void *evrpc_add_hook(struct evrpc_base *base,
enum EVRPC_HOOK_TYPE hook_type,
int (*cb)(struct evhttp_request *, struct evbuffer *, void *),
void *cb_arg);
int evrpc_remove_hook(struct evrpc_base *base,
enum EVRPC_HOOK_TYPE hook_type,
void *handle);
#ifdef __cplusplus
}
#endif

View File

@ -91,11 +91,21 @@ EVRPC_HEADER(NeverReply, msg, kill);
EVRPC_GENERATE(Message, msg, kill);
EVRPC_GENERATE(NeverReply, msg, kill);
static int need_input_hook = 0;
static int need_output_hook = 0;
void
MessageCb(EVRPC_STRUCT(Message)* rpc, void *arg)
{
struct kill* kill_reply = rpc->reply;
if (need_input_hook) {
struct evhttp_request* req = EVRPC_REQUEST_HTTP(rpc);
const char *header = evhttp_find_header(
req->input_headers, "X-Hook");
assert(strcmp(header, "input") == 0);
}
/* we just want to fill in some non-sense */
EVTAG_ASSIGN(kill_reply, weapon, "dagger");
EVTAG_ASSIGN(kill_reply, action, "wave around like an idiot");
@ -129,6 +139,9 @@ rpc_setup(struct evhttp **phttp, short *pport, struct evrpc_base **pbase)
*phttp = http;
*pport = port;
*pbase = base;
need_input_hook = 0;
need_output_hook = 0;
}
static void
@ -330,6 +343,13 @@ GotKillCb(struct evrpc_status *status,
char *weapon;
char *action;
if (need_output_hook) {
struct evhttp_request *req = status->http_req;
const char *header = evhttp_find_header(
req->input_headers, "X-Hook");
assert(strcmp(header, "output") == 0);
}
if (status->error != EVRPC_STATUS_ERR_NONE)
goto done;
@ -386,6 +406,18 @@ done:
event_loopexit(NULL);
}
static int
rpc_hook_add_header(struct evhttp_request *req,
struct evbuffer *evbuf, void *arg)
{
const char *hook_type = arg;
if (strcmp("input", hook_type) == 0)
evhttp_add_header(req->input_headers, "X-Hook", hook_type);
else
evhttp_add_header(req->output_headers, "X-Hook", hook_type);
return (0);
}
static void
rpc_basic_client(void)
{
@ -400,6 +432,14 @@ rpc_basic_client(void)
rpc_setup(&http, &port, &base);
need_input_hook = 1;
need_output_hook = 1;
assert(evrpc_add_hook(base, INPUT, rpc_hook_add_header, "input")
!= NULL);
assert(evrpc_add_hook(base, OUTPUT, rpc_hook_add_header, "output")
!= NULL);
pool = rpc_pool_with_connection(port);
/* set up the basic message */