From fda1216b6e18df98b8f3c32ce4132f955c346a90 Mon Sep 17 00:00:00 2001 From: Niels Provos Date: Mon, 20 Nov 2006 03:32:53 +0000 Subject: [PATCH] generate client request code via macro; flesh out the pools a little bit. svn:r268 --- evrpc-internal.h | 10 +++ evrpc.c | 159 ++++++++++++++++++++++++++++++++++++++++++--- evrpc.h | 122 +++++++++++++++++++++++++++++++++- http-internal.h | 5 +- test/regress_rpc.c | 3 +- 5 files changed, 285 insertions(+), 14 deletions(-) diff --git a/evrpc-internal.h b/evrpc-internal.h index 4a27a364..de2ab47d 100644 --- a/evrpc-internal.h +++ b/evrpc-internal.h @@ -27,6 +27,8 @@ #ifndef _EVRPC_INTERNAL_H_ #define _EVRPC_INTERNAL_H_ +#include "http-internal.h" + struct evrpc; #define EVRPC_URI_PREFIX "/.rpc." @@ -42,4 +44,12 @@ struct evrpc_base { struct evrpc_req_generic; void evrpc_reqstate_free(struct evrpc_req_generic* rpc_state); +/* A pool for holding evhttp_connection objects */ +struct evrpc_pool { + struct evconq connections; + + TAILQ_HEAD(evrpc_requestq, evrpc_request_wrapper) requests; +}; + + #endif /* _EVRPC_INTERNAL_H_ */ diff --git a/evrpc.c b/evrpc.c index 74d6601a..2b94d86a 100644 --- a/evrpc.c +++ b/evrpc.c @@ -36,6 +36,7 @@ #endif #include #include +#include #ifdef HAVE_SYS_TIME_H #include #else @@ -89,25 +90,32 @@ void evrpc_request_done(struct evrpc_req_generic*); * calls this function. */ -int -evrpc_register_rpc(struct evrpc_base *base, struct evrpc *rpc, - void (*cb)(struct evrpc_req_generic *, void *), void *cb_arg) +char * +evrpc_construct_uri(const char *uri) { char *constructed_uri; int constructed_uri_len; - rpc->cb = cb; - rpc->cb_arg = cb_arg; - - constructed_uri_len = strlen(EVRPC_URI_PREFIX) + strlen(rpc->uri) + 1; + constructed_uri_len = strlen(EVRPC_URI_PREFIX) + strlen(uri) + 1; if ((constructed_uri = malloc(constructed_uri_len)) == NULL) event_err(1, "%s: failed to register rpc at %s", - __func__, rpc->uri); + __func__, uri); memcpy(constructed_uri, EVRPC_URI_PREFIX, strlen(EVRPC_URI_PREFIX)); - memcpy(constructed_uri + strlen(EVRPC_URI_PREFIX), - rpc->uri, strlen(rpc->uri)); + memcpy(constructed_uri + strlen(EVRPC_URI_PREFIX), uri, strlen(uri)); constructed_uri[constructed_uri_len - 1] = '\0'; + return (constructed_uri); +} + +int +evrpc_register_rpc(struct evrpc_base *base, struct evrpc *rpc, + void (*cb)(struct evrpc_req_generic *, void *), void *cb_arg) +{ + char *constructed_uri = evrpc_construct_uri(rpc->uri); + + rpc->cb = cb; + rpc->cb_arg = cb_arg; + TAILQ_INSERT_TAIL(&base->registered_rpcs, rpc, next); evhttp_set_cb(base->http_server, @@ -216,3 +224,134 @@ error: evhttp_send_error(req, HTTP_SERVUNAVAIL, "Service Error"); return; } + +/* Client implementation of RPC site */ + +static int evrpc_schedule_request(struct evhttp_connection *connection, + struct evrpc_request_wrapper *ctx); + +struct evrpc_pool * +evrpc_pool_new() +{ + struct evrpc_pool *pool = calloc(1, sizeof(struct evrpc_pool)); + if (pool == NULL) + return (NULL); + + TAILQ_INIT(&pool->connections); + TAILQ_INIT(&pool->requests); + + return (pool); +} + +void +evrpc_pool_free(struct evrpc_pool *pool) +{ + struct evhttp_connection *connection; + struct evrpc_request_wrapper *request; + + while ((request = TAILQ_FIRST(&pool->requests)) != NULL) { + TAILQ_REMOVE(&pool->requests, request, next); + /* if this gets more complicated we need our own function */ + free(request->name); + free(request); + } + + while ((connection = TAILQ_FIRST(&pool->connections)) != NULL) { + TAILQ_REMOVE(&pool->connections, connection, next); + evhttp_connection_free(connection); + } + + free(pool); +} + +void +evrpc_pool_add_connection(struct evrpc_pool *pool, + struct evhttp_connection *connection) { + assert(connection->http_server == NULL); + TAILQ_INSERT_TAIL(&pool->connections, connection, next); + + /* + * if we have any requests, pending schedule them with the new + * connections. + */ + + if (TAILQ_FIRST(&pool->requests) != NULL) { + struct evrpc_request_wrapper *request = + TAILQ_FIRST(&pool->requests); + TAILQ_REMOVE(&pool->requests, request, next); + evrpc_schedule_request(connection, request); + } +} + + +static void evrpc_reply_done(struct evhttp_request *, void *); + +/* + * Finds a connection object associated with the pool that is currently + * idle and can be used to make a request. + */ +static struct evhttp_connection * +evrpc_pool_find_connection(struct evrpc_pool *pool) +{ + struct evhttp_connection *connection; + TAILQ_FOREACH(connection, &pool->connections, next) { + if (TAILQ_FIRST(&connection->requests) == NULL) + return (connection); + } + + return (NULL); +} + +/* + * We assume that the ctx is no longer queued on the pool. + */ +static int +evrpc_schedule_request(struct evhttp_connection *connection, + struct evrpc_request_wrapper *ctx) +{ + struct evbuffer *output; + struct evhttp_request *req; + if ((output = evbuffer_new()) == NULL) + goto error; + + if ((req = evhttp_request_new(evrpc_reply_done, ctx)) == NULL) + goto error; + + return (0); + +error: + (*ctx->cb)(ctx->request, ctx->reply, ctx->cb_arg); + free(ctx); + return (-1); +} + +int +evrpc_make_request(struct evrpc_request_wrapper *ctx) +{ + struct evrpc_pool *pool = ctx->pool; + struct evhttp_connection *connection; + + /* we better have some available connections on the pool */ + assert(TAILQ_FIRST(&pool->connections) != NULL); + + + /* even if a connection might be available, we do FIFO */ + if (TAILQ_FIRST(&pool->requests) == NULL) { + connection = evrpc_pool_find_connection(pool); + if (connection != NULL) + return evrpc_schedule_request(connection, ctx); + } + + /* + * if no connection is available, we queue the request on the pool, + * the next time a connection is empty, the rpc will be send on that. + */ + TAILQ_INSERT_TAIL(&pool->requests, ctx, next); + return (0); +} + +static void +evrpc_reply_done(struct evhttp_request *req, void *arg) +{ + struct evrpc_request_wrapper *ctx = arg; +} diff --git a/evrpc.h b/evrpc.h index 18db7f1b..5c9d2a81 100644 --- a/evrpc.h +++ b/evrpc.h @@ -27,6 +27,40 @@ #ifndef _EVRPC_H_ #define _EVRPC_H_ +/* + * This header files provides basic support for an RPC server and client. + * + * To support RPCs in a server, every supported RPC command needs to be + * defined and registered. + * + * EVRPC_HEADER(SendCommand, Request, Reply); + * + * SendCommand is the name of the RPC command. + * Request is the name of a structure generated by event_rpcgen.py. + * It contains all parameters relating to the SendCommand RPC. The + * server needs to fill in the Reply structure. + * Reply is the name of a structure generated by event_rpcgen.py. It + * contains the answer to the RPC. + * + * To register an RPC with an HTTP server, you need to first create an RPC + * base with: + * + * struct evrpc_base *base = evrpc_init(http); + * + * A specific RPC can then be registered with + * + * EVRPC_REGISTER(base, "SendCommand", Request, Reply, FunctionCB, arg); + * + * when the server receives an appropriately formatted RPC, the user callback + * is invokved. The callback needs to fill in the reply structure. + * + * void FunctionCB(EVRPC_STRUCT(SendCommand)* rpc, void *arg); + * + * To send the reply, call EVRPC_REQUEST_DONE(rpc); + * + * See the regression test for an example. + */ + struct evbuffer; struct evrpc_req_generic; @@ -40,7 +74,7 @@ struct evrpc { /* creates a new request structure */ void *(*request_new)(void); - /* creates a new request structure */ + /* frees the request structure */ void (*request_free)(void *); /* unmarshals the buffer into the proper request structure */ @@ -92,14 +126,51 @@ struct evrpc_req_generic { void (*done)(struct evrpc_req_generic* rpc); }; -#define EVRPC_DEFINE(rpcname, reqstruct, rplystruct) \ +/* + * You need to use EVRPC_HEADER to create structures and function prototypes + * needed by the server and client implmentation. + */ +#define EVRPC_HEADER(rpcname, reqstruct, rplystruct) \ EVRPC_STRUCT(rpcname) { \ struct reqstruct* request; \ struct rplystruct* reply; \ struct evrpc* rpc; \ void (*done)(struct evrpc* rpc, void *request, void *reply); \ +}; \ +int evrpc_send_request_##rpcname(struct evrpc_pool *, \ + struct reqstruct *, struct rplystruct *, \ + void (*)(struct reqstruct *, struct rplystruct *, void *cbarg), \ + void *); + +#define EVRPC_GENERATE(rpcname, reqstruct, rplystruct) \ +int evrpc_send_request_##rpcname(struct evrpc_pool *pool, \ + struct reqstruct *request, struct rplystruct *reply, \ + void (*cb)(struct reqstruct *, struct rplystruct *, void *cbarg), \ + void *cbarg) { \ + struct evrpc_request_wrapper *ctx; \ + ctx = malloc(sizeof(struct evrpc_request_wrapper)); \ + if (ctx == NULL) { \ + (*(cb))(request, reply, cbarg); \ + return (-1); \ + } \ + ctx->pool = pool; \ + ctx->name = strdup(#rpcname); \ + if (ctx->name == NULL) { \ + free(ctx); \ + (*(cb))(request, reply, cbarg); \ + return (-1); \ + } \ + ctx->cb = (void (*)(void *, void *, void *))cb; \ + ctx->cb_arg = cbarg; \ + ctx->request = (void *)request; \ + ctx->reply = (void *)reply; \ + ctx->request_marshal = (void (*)(struct evbuffer *, void *))reqstruct##_marshal; \ + ctx->reply_clear = (void (*)(void *))rplystruct##_clear; \ + ctx->reply_unmarshal = (int (*)(void *, struct evbuffer *))rplystruct##_unmarshal; \ + return (evrpc_make_request(ctx)); \ } + /* * EVRPC_REQUEST_DONE is used to answer a request; the reply is expected * to have been filled in. The request and reply pointers become invalid @@ -144,4 +215,51 @@ struct evrpc_base *evrpc_init(struct evhttp *server); int evrpc_register_rpc(struct evrpc_base *, struct evrpc *, void (*)(struct evrpc_req_generic*, void *), void *); +/* + * Client-side RPC support + */ + +struct evrpc_pool; +struct evhttp_connection; + +struct evrpc_request_wrapper { + TAILQ_ENTRY(evrpc_request_wrapper) next; + + /* pool on which this rpc request is being made */ + struct evrpc_pool *pool; + + /* the name of the rpc */ + char *name; + + /* callback */ + void (*cb)(void *request, void *reply, void *arg); + void *cb_arg; + + void *request; + void *reply; + + /* unmarshals the buffer into the proper request structure */ + void (*request_marshal)(struct evbuffer *, void *); + + /* removes all stored state in the reply */ + void (*reply_clear)(void *); + + /* marshals the reply into a buffer */ + int (*reply_unmarshal)(void *, struct evbuffer*); +}; + +#define EVRPC_MAKE_REQUEST(name, request, reply, cb, cbarg) \ + evrpc_send_request_##name(pool, request, reply, cb, cbarg) + +int evrpc_make_request(struct evrpc_request_wrapper *); + +/* + * a pool has a number of connections associated with it. + * rpc requests are always made via a pool. + */ +struct evrpc_pool *evrpc_pool_new(); +void evrpc_pool_free(struct evrpc_pool *); +void evrpc_pool_add_connection(struct evrpc_pool *, + struct evhttp_connection *); + #endif /* _EVRPC_H_ */ diff --git a/http-internal.h b/http-internal.h index 17015e1e..f6467f0e 100644 --- a/http-internal.h +++ b/http-internal.h @@ -65,11 +65,14 @@ struct evhttp_cb { void *cbarg; }; +/* both the http server as well as the rpc system need to queue connections */ +TAILQ_HEAD(evconq, evhttp_connection); + struct evhttp { struct event bind_ev; TAILQ_HEAD(httpcbq, evhttp_cb) callbacks; - TAILQ_HEAD(evconq, evhttp_connection) connections; + struct evconq connections; void (*gencb)(struct evhttp_request *req, void *); void *gencbarg; diff --git a/test/regress_rpc.c b/test/regress_rpc.c index 59113bf5..7e4b3e38 100644 --- a/test/regress_rpc.c +++ b/test/regress_rpc.c @@ -84,7 +84,8 @@ http_setup(short *pport) return (myhttp); } -EVRPC_DEFINE(Message, msg, kill); +EVRPC_HEADER(Message, msg, kill); +EVRPC_GENERATE(Message, msg, kill); void MessageCB(EVRPC_STRUCT(Message)* rpc, void *arg)