From 65236aa8578aeb17c088b41818da17311672aed1 Mon Sep 17 00:00:00 2001 From: Niels Provos Date: Fri, 2 Nov 2007 06:34:04 +0000 Subject: [PATCH] simple hooks for processing incoming and outgoing rpcs svn:r466 --- ChangeLog | 1 + evrpc-internal.h | 12 +++++ evrpc.c | 108 +++++++++++++++++++++++++++++++++++++++++++-- evrpc.h | 34 ++++++++++++++ test/regress_rpc.c | 40 +++++++++++++++++ 5 files changed, 192 insertions(+), 3 deletions(-) diff --git a/ChangeLog b/ChangeLog index 2b811087..755a6568 100644 --- a/ChangeLog +++ b/ChangeLog @@ -29,3 +29,4 @@ Changes in current version: o Fix implementation of strsep on platforms that lack it o Fix implementation of getaddrinfo on platforms that lack it; mainly, this will make Windows http.c work better. Original patch by Lubomir Marinov. o Fix evport implementation: port_disassociate called on unassociated events resulting in bogus errors; more efficient memory management; from Trond Norbye and Prakash Sangappa + o support for hooks on rpc input and output; can be used to implement rpc independent processing such as compression or authentication. diff --git a/evrpc-internal.h b/evrpc-internal.h index 656533b6..8b8dd691 100644 --- a/evrpc-internal.h +++ b/evrpc-internal.h @@ -33,12 +33,24 @@ struct evrpc; #define EVRPC_URI_PREFIX "/.rpc." +struct evrpc_hook { + TAILQ_ENTRY(evrpc_hook) (next); + + /* returns -1; if the rpc should be aborted, is allowed to rewrite */ + int (*process)(struct evhttp_request *, struct evbuffer *, void *); + void *process_arg; +}; + struct evrpc_base { /* the HTTP server under which we register our RPC calls */ struct evhttp* http_server; /* a list of all RPCs registered with us */ TAILQ_HEAD(evrpc_list, evrpc) registered_rpcs; + + /* hooks for processing outbound and inbound rpcs */ + TAILQ_HEAD(evrpc_hook_list, evrpc_hook) input_hooks; + struct evrpc_hook_list output_hooks; }; struct evrpc_req_generic; diff --git a/evrpc.c b/evrpc.c index 0d24d36c..b1fb4765 100644 --- a/evrpc.c +++ b/evrpc.c @@ -74,6 +74,8 @@ evrpc_init(struct evhttp *http_server) evtag_init(); TAILQ_INIT(&base->registered_rpcs); + TAILQ_INIT(&base->input_hooks); + TAILQ_INIT(&base->output_hooks); base->http_server = http_server; return (base); @@ -83,14 +85,95 @@ void evrpc_free(struct evrpc_base *base) { struct evrpc *rpc; - + struct evrpc_hook *hook; + while ((rpc = TAILQ_FIRST(&base->registered_rpcs)) != NULL) { assert(evrpc_unregister_rpc(base, rpc->uri)); } - + while ((hook = TAILQ_FIRST(&base->input_hooks)) != NULL) { + assert(evrpc_remove_hook(base, INPUT, hook)); + } + while ((hook = TAILQ_FIRST(&base->output_hooks)) != NULL) { + assert(evrpc_remove_hook(base, OUTPUT, hook)); + } free(base); } +void * +evrpc_add_hook(struct evrpc_base *base, + enum EVRPC_HOOK_TYPE hook_type, + int (*cb)(struct evhttp_request *, struct evbuffer *, void *), + void *cb_arg) +{ + struct evrpc_hook_list *head = NULL; + struct evrpc_hook *hook = NULL; + switch (hook_type) { + case INPUT: + head = &base->input_hooks; + break; + case OUTPUT: + head = &base->output_hooks; + break; + default: + assert(hook_type == INPUT || hook_type == OUTPUT); + } + + hook = calloc(1, sizeof(struct evrpc_hook)); + assert(hook != NULL); + + hook->process = cb; + hook->process_arg = cb_arg; + TAILQ_INSERT_TAIL(head, hook, next); + + return (hook); +} + +/* + * remove the hook specified by the handle + */ + +int +evrpc_remove_hook(struct evrpc_base *base, + enum EVRPC_HOOK_TYPE hook_type, + void *handle) +{ + struct evrpc_hook_list *head = NULL; + struct evrpc_hook *hook = NULL; + switch (hook_type) { + case INPUT: + head = &base->input_hooks; + break; + case OUTPUT: + head = &base->output_hooks; + break; + default: + assert(hook_type == INPUT || hook_type == OUTPUT); + } + + TAILQ_FOREACH(hook, head, next) { + if (hook == handle) { + TAILQ_REMOVE(head, hook, next); + free(hook); + return (1); + } + } + + return (0); +} + +static int +evrpc_process_hooks(struct evrpc_hook_list *head, + struct evhttp_request *req, struct evbuffer *evbuf) +{ + struct evrpc_hook *hook; + TAILQ_FOREACH(hook, head, next) { + if (hook->process(req, evbuf, hook->process_arg) == -1) + return (-1); + } + + return (0); +} + static void evrpc_pool_schedule(struct evrpc_pool *pool); static void evrpc_request_cb(struct evhttp_request *, void *); void evrpc_request_done(struct evrpc_req_generic*); @@ -124,6 +207,7 @@ evrpc_register_rpc(struct evrpc_base *base, struct evrpc *rpc, { char *constructed_uri = evrpc_construct_uri(rpc->uri); + rpc->base = base; rpc->cb = cb; rpc->cb_arg = cb_arg; @@ -179,6 +263,15 @@ evrpc_request_cb(struct evhttp_request *req, void *arg) EVBUFFER_LENGTH(req->input_buffer) <= 0) goto error; + /* + * we might want to allow hooks to suspend the processing, + * but at the moment, we assume that they just act as simple + * filters. + */ + if (evrpc_process_hooks(&rpc->base->input_hooks, + req, req->input_buffer) == -1) + goto error; + rpc_state = calloc(1, sizeof(struct evrpc_req_generic)); if (rpc_state == NULL) goto error; @@ -236,7 +329,7 @@ evrpc_request_done(struct evrpc_req_generic* rpc_state) { struct evhttp_request *req = rpc_state->http_req; struct evrpc *rpc = rpc_state->rpc; - struct evbuffer* data; + struct evbuffer* data = NULL; if (rpc->reply_complete(rpc_state->reply) == -1) { /* the reply was not completely filled in. error out */ @@ -251,6 +344,11 @@ evrpc_request_done(struct evrpc_req_generic* rpc_state) /* serialize the reply */ rpc->reply_marshal(data, rpc_state->reply); + /* do hook based tweaks to the request */ + if (evrpc_process_hooks(&rpc->base->output_hooks, + req, data) == -1) + goto error; + evhttp_send_reply(req, HTTP_OK, "OK", data); evbuffer_free(data); @@ -260,6 +358,8 @@ evrpc_request_done(struct evrpc_req_generic* rpc_state) return; error: + if (data != NULL) + evbuffer_free(data); evrpc_reqstate_free(rpc_state); evhttp_send_error(req, HTTP_SERVUNAVAIL, "Service Error"); return; @@ -460,6 +560,8 @@ evrpc_reply_done(struct evhttp_request *req, void *arg) event_del(&ctx->ev_timeout); memset(&status, 0, sizeof(status)); + status.http_req = req; + /* we need to get the reply now */ if (req != NULL) { res = ctx->reply_unmarshal(ctx->reply, req->input_buffer); diff --git a/evrpc.h b/evrpc.h index 5f45e780..45911146 100644 --- a/evrpc.h +++ b/evrpc.h @@ -99,6 +99,9 @@ struct evrpc { /* the callback invoked for each received rpc */ void (*cb)(struct evrpc_req_generic *, void *); void *cb_arg; + + /* reference for further configuration */ + struct evrpc_base *base; }; #define EVRPC_STRUCT(rpcname) struct evrpc_req__##rpcname @@ -140,6 +143,7 @@ EVRPC_STRUCT(rpcname) { \ struct reqstruct* request; \ struct rplystruct* reply; \ struct evrpc* rpc; \ + struct evhttp_request* http_req; \ void (*done)(struct evrpc_status *, \ struct evrpc* rpc, void *request, void *reply); \ }; \ @@ -184,6 +188,11 @@ error: \ return (-1); \ } +/* + * Access to the underlying http object; can be used to look at headers or + * for getting the remote ip address + */ +#define EVRPC_REQUEST_HTTP(rpc_req) (rpc_req)->http_req /* * EVRPC_REQUEST_DONE is used to answer a request; the reply is expected @@ -252,6 +261,9 @@ struct evrpc_status { #define EVRPC_STATUS_ERR_BADPAYLOAD 2 #define EVRPC_STATUS_ERR_UNSTARTED 3 int error; + + /* for looking at headers or other information */ + struct evhttp_request *http_req; }; struct evrpc_request_wrapper { @@ -313,6 +325,28 @@ void evrpc_pool_add_connection(struct evrpc_pool *, */ void evrpc_pool_set_timeout(struct evrpc_pool *, int timeout_in_secs); +/* + * Hooks for changing the input and output of RPCs; this can be used to + * implement compression, authentication, encryption, ... + * + * If a hook returns -1, the processing is aborted. + * + * The add functions return handles that can be used for removing hooks. + */ + +enum EVRPC_HOOK_TYPE { + INPUT, OUTPUT +}; + +void *evrpc_add_hook(struct evrpc_base *base, + enum EVRPC_HOOK_TYPE hook_type, + int (*cb)(struct evhttp_request *, struct evbuffer *, void *), + void *cb_arg); + +int evrpc_remove_hook(struct evrpc_base *base, + enum EVRPC_HOOK_TYPE hook_type, + void *handle); + #ifdef __cplusplus } #endif diff --git a/test/regress_rpc.c b/test/regress_rpc.c index 3361f9e9..341cf991 100644 --- a/test/regress_rpc.c +++ b/test/regress_rpc.c @@ -91,11 +91,21 @@ EVRPC_HEADER(NeverReply, msg, kill); EVRPC_GENERATE(Message, msg, kill); EVRPC_GENERATE(NeverReply, msg, kill); +static int need_input_hook = 0; +static int need_output_hook = 0; + void MessageCb(EVRPC_STRUCT(Message)* rpc, void *arg) { struct kill* kill_reply = rpc->reply; + if (need_input_hook) { + struct evhttp_request* req = EVRPC_REQUEST_HTTP(rpc); + const char *header = evhttp_find_header( + req->input_headers, "X-Hook"); + assert(strcmp(header, "input") == 0); + } + /* we just want to fill in some non-sense */ EVTAG_ASSIGN(kill_reply, weapon, "dagger"); EVTAG_ASSIGN(kill_reply, action, "wave around like an idiot"); @@ -129,6 +139,9 @@ rpc_setup(struct evhttp **phttp, short *pport, struct evrpc_base **pbase) *phttp = http; *pport = port; *pbase = base; + + need_input_hook = 0; + need_output_hook = 0; } static void @@ -330,6 +343,13 @@ GotKillCb(struct evrpc_status *status, char *weapon; char *action; + if (need_output_hook) { + struct evhttp_request *req = status->http_req; + const char *header = evhttp_find_header( + req->input_headers, "X-Hook"); + assert(strcmp(header, "output") == 0); + } + if (status->error != EVRPC_STATUS_ERR_NONE) goto done; @@ -386,6 +406,18 @@ done: event_loopexit(NULL); } +static int +rpc_hook_add_header(struct evhttp_request *req, + struct evbuffer *evbuf, void *arg) +{ + const char *hook_type = arg; + if (strcmp("input", hook_type) == 0) + evhttp_add_header(req->input_headers, "X-Hook", hook_type); + else + evhttp_add_header(req->output_headers, "X-Hook", hook_type); + return (0); +} + static void rpc_basic_client(void) { @@ -400,6 +432,14 @@ rpc_basic_client(void) rpc_setup(&http, &port, &base); + need_input_hook = 1; + need_output_hook = 1; + + assert(evrpc_add_hook(base, INPUT, rpc_hook_add_header, "input") + != NULL); + assert(evrpc_add_hook(base, OUTPUT, rpc_hook_add_header, "output") + != NULL); + pool = rpc_pool_with_connection(port); /* set up the basic message */