generate client request code via macro; flesh out the pools a little bit.

svn:r268
This commit is contained in:
Niels Provos 2006-11-20 03:32:53 +00:00
parent 3a15f7d4e4
commit fda1216b6e
5 changed files with 285 additions and 14 deletions

View File

@ -27,6 +27,8 @@
#ifndef _EVRPC_INTERNAL_H_
#define _EVRPC_INTERNAL_H_
#include "http-internal.h"
struct evrpc;
#define EVRPC_URI_PREFIX "/.rpc."
@ -42,4 +44,12 @@ struct evrpc_base {
struct evrpc_req_generic;
void evrpc_reqstate_free(struct evrpc_req_generic* rpc_state);
/* A pool for holding evhttp_connection objects */
struct evrpc_pool {
struct evconq connections;
TAILQ_HEAD(evrpc_requestq, evrpc_request_wrapper) requests;
};
#endif /* _EVRPC_INTERNAL_H_ */

159
evrpc.c
View File

@ -36,6 +36,7 @@
#endif
#include <sys/types.h>
#include <sys/tree.h>
#include <sys/socket.h>
#ifdef HAVE_SYS_TIME_H
#include <sys/time.h>
#else
@ -89,25 +90,32 @@ void evrpc_request_done(struct evrpc_req_generic*);
* calls this function.
*/
int
evrpc_register_rpc(struct evrpc_base *base, struct evrpc *rpc,
void (*cb)(struct evrpc_req_generic *, void *), void *cb_arg)
char *
evrpc_construct_uri(const char *uri)
{
char *constructed_uri;
int constructed_uri_len;
rpc->cb = cb;
rpc->cb_arg = cb_arg;
constructed_uri_len = strlen(EVRPC_URI_PREFIX) + strlen(rpc->uri) + 1;
constructed_uri_len = strlen(EVRPC_URI_PREFIX) + strlen(uri) + 1;
if ((constructed_uri = malloc(constructed_uri_len)) == NULL)
event_err(1, "%s: failed to register rpc at %s",
__func__, rpc->uri);
__func__, uri);
memcpy(constructed_uri, EVRPC_URI_PREFIX, strlen(EVRPC_URI_PREFIX));
memcpy(constructed_uri + strlen(EVRPC_URI_PREFIX),
rpc->uri, strlen(rpc->uri));
memcpy(constructed_uri + strlen(EVRPC_URI_PREFIX), uri, strlen(uri));
constructed_uri[constructed_uri_len - 1] = '\0';
return (constructed_uri);
}
int
evrpc_register_rpc(struct evrpc_base *base, struct evrpc *rpc,
void (*cb)(struct evrpc_req_generic *, void *), void *cb_arg)
{
char *constructed_uri = evrpc_construct_uri(rpc->uri);
rpc->cb = cb;
rpc->cb_arg = cb_arg;
TAILQ_INSERT_TAIL(&base->registered_rpcs, rpc, next);
evhttp_set_cb(base->http_server,
@ -216,3 +224,134 @@ error:
evhttp_send_error(req, HTTP_SERVUNAVAIL, "Service Error");
return;
}
/* Client implementation of RPC site */
static int evrpc_schedule_request(struct evhttp_connection *connection,
struct evrpc_request_wrapper *ctx);
struct evrpc_pool *
evrpc_pool_new()
{
struct evrpc_pool *pool = calloc(1, sizeof(struct evrpc_pool));
if (pool == NULL)
return (NULL);
TAILQ_INIT(&pool->connections);
TAILQ_INIT(&pool->requests);
return (pool);
}
void
evrpc_pool_free(struct evrpc_pool *pool)
{
struct evhttp_connection *connection;
struct evrpc_request_wrapper *request;
while ((request = TAILQ_FIRST(&pool->requests)) != NULL) {
TAILQ_REMOVE(&pool->requests, request, next);
/* if this gets more complicated we need our own function */
free(request->name);
free(request);
}
while ((connection = TAILQ_FIRST(&pool->connections)) != NULL) {
TAILQ_REMOVE(&pool->connections, connection, next);
evhttp_connection_free(connection);
}
free(pool);
}
void
evrpc_pool_add_connection(struct evrpc_pool *pool,
struct evhttp_connection *connection) {
assert(connection->http_server == NULL);
TAILQ_INSERT_TAIL(&pool->connections, connection, next);
/*
* if we have any requests, pending schedule them with the new
* connections.
*/
if (TAILQ_FIRST(&pool->requests) != NULL) {
struct evrpc_request_wrapper *request =
TAILQ_FIRST(&pool->requests);
TAILQ_REMOVE(&pool->requests, request, next);
evrpc_schedule_request(connection, request);
}
}
static void evrpc_reply_done(struct evhttp_request *, void *);
/*
* Finds a connection object associated with the pool that is currently
* idle and can be used to make a request.
*/
static struct evhttp_connection *
evrpc_pool_find_connection(struct evrpc_pool *pool)
{
struct evhttp_connection *connection;
TAILQ_FOREACH(connection, &pool->connections, next) {
if (TAILQ_FIRST(&connection->requests) == NULL)
return (connection);
}
return (NULL);
}
/*
* We assume that the ctx is no longer queued on the pool.
*/
static int
evrpc_schedule_request(struct evhttp_connection *connection,
struct evrpc_request_wrapper *ctx)
{
struct evbuffer *output;
struct evhttp_request *req;
if ((output = evbuffer_new()) == NULL)
goto error;
if ((req = evhttp_request_new(evrpc_reply_done, ctx)) == NULL)
goto error;
return (0);
error:
(*ctx->cb)(ctx->request, ctx->reply, ctx->cb_arg);
free(ctx);
return (-1);
}
int
evrpc_make_request(struct evrpc_request_wrapper *ctx)
{
struct evrpc_pool *pool = ctx->pool;
struct evhttp_connection *connection;
/* we better have some available connections on the pool */
assert(TAILQ_FIRST(&pool->connections) != NULL);
/* even if a connection might be available, we do FIFO */
if (TAILQ_FIRST(&pool->requests) == NULL) {
connection = evrpc_pool_find_connection(pool);
if (connection != NULL)
return evrpc_schedule_request(connection, ctx);
}
/*
* if no connection is available, we queue the request on the pool,
* the next time a connection is empty, the rpc will be send on that.
*/
TAILQ_INSERT_TAIL(&pool->requests, ctx, next);
return (0);
}
static void
evrpc_reply_done(struct evhttp_request *req, void *arg)
{
struct evrpc_request_wrapper *ctx = arg;
}

122
evrpc.h
View File

@ -27,6 +27,40 @@
#ifndef _EVRPC_H_
#define _EVRPC_H_
/*
* This header files provides basic support for an RPC server and client.
*
* To support RPCs in a server, every supported RPC command needs to be
* defined and registered.
*
* EVRPC_HEADER(SendCommand, Request, Reply);
*
* SendCommand is the name of the RPC command.
* Request is the name of a structure generated by event_rpcgen.py.
* It contains all parameters relating to the SendCommand RPC. The
* server needs to fill in the Reply structure.
* Reply is the name of a structure generated by event_rpcgen.py. It
* contains the answer to the RPC.
*
* To register an RPC with an HTTP server, you need to first create an RPC
* base with:
*
* struct evrpc_base *base = evrpc_init(http);
*
* A specific RPC can then be registered with
*
* EVRPC_REGISTER(base, "SendCommand", Request, Reply, FunctionCB, arg);
*
* when the server receives an appropriately formatted RPC, the user callback
* is invokved. The callback needs to fill in the reply structure.
*
* void FunctionCB(EVRPC_STRUCT(SendCommand)* rpc, void *arg);
*
* To send the reply, call EVRPC_REQUEST_DONE(rpc);
*
* See the regression test for an example.
*/
struct evbuffer;
struct evrpc_req_generic;
@ -40,7 +74,7 @@ struct evrpc {
/* creates a new request structure */
void *(*request_new)(void);
/* creates a new request structure */
/* frees the request structure */
void (*request_free)(void *);
/* unmarshals the buffer into the proper request structure */
@ -92,14 +126,51 @@ struct evrpc_req_generic {
void (*done)(struct evrpc_req_generic* rpc);
};
#define EVRPC_DEFINE(rpcname, reqstruct, rplystruct) \
/*
* You need to use EVRPC_HEADER to create structures and function prototypes
* needed by the server and client implmentation.
*/
#define EVRPC_HEADER(rpcname, reqstruct, rplystruct) \
EVRPC_STRUCT(rpcname) { \
struct reqstruct* request; \
struct rplystruct* reply; \
struct evrpc* rpc; \
void (*done)(struct evrpc* rpc, void *request, void *reply); \
}; \
int evrpc_send_request_##rpcname(struct evrpc_pool *, \
struct reqstruct *, struct rplystruct *, \
void (*)(struct reqstruct *, struct rplystruct *, void *cbarg), \
void *);
#define EVRPC_GENERATE(rpcname, reqstruct, rplystruct) \
int evrpc_send_request_##rpcname(struct evrpc_pool *pool, \
struct reqstruct *request, struct rplystruct *reply, \
void (*cb)(struct reqstruct *, struct rplystruct *, void *cbarg), \
void *cbarg) { \
struct evrpc_request_wrapper *ctx; \
ctx = malloc(sizeof(struct evrpc_request_wrapper)); \
if (ctx == NULL) { \
(*(cb))(request, reply, cbarg); \
return (-1); \
} \
ctx->pool = pool; \
ctx->name = strdup(#rpcname); \
if (ctx->name == NULL) { \
free(ctx); \
(*(cb))(request, reply, cbarg); \
return (-1); \
} \
ctx->cb = (void (*)(void *, void *, void *))cb; \
ctx->cb_arg = cbarg; \
ctx->request = (void *)request; \
ctx->reply = (void *)reply; \
ctx->request_marshal = (void (*)(struct evbuffer *, void *))reqstruct##_marshal; \
ctx->reply_clear = (void (*)(void *))rplystruct##_clear; \
ctx->reply_unmarshal = (int (*)(void *, struct evbuffer *))rplystruct##_unmarshal; \
return (evrpc_make_request(ctx)); \
}
/*
* EVRPC_REQUEST_DONE is used to answer a request; the reply is expected
* to have been filled in. The request and reply pointers become invalid
@ -144,4 +215,51 @@ struct evrpc_base *evrpc_init(struct evhttp *server);
int evrpc_register_rpc(struct evrpc_base *, struct evrpc *,
void (*)(struct evrpc_req_generic*, void *), void *);
/*
* Client-side RPC support
*/
struct evrpc_pool;
struct evhttp_connection;
struct evrpc_request_wrapper {
TAILQ_ENTRY(evrpc_request_wrapper) next;
/* pool on which this rpc request is being made */
struct evrpc_pool *pool;
/* the name of the rpc */
char *name;
/* callback */
void (*cb)(void *request, void *reply, void *arg);
void *cb_arg;
void *request;
void *reply;
/* unmarshals the buffer into the proper request structure */
void (*request_marshal)(struct evbuffer *, void *);
/* removes all stored state in the reply */
void (*reply_clear)(void *);
/* marshals the reply into a buffer */
int (*reply_unmarshal)(void *, struct evbuffer*);
};
#define EVRPC_MAKE_REQUEST(name, request, reply, cb, cbarg) \
evrpc_send_request_##name(pool, request, reply, cb, cbarg)
int evrpc_make_request(struct evrpc_request_wrapper *);
/*
* a pool has a number of connections associated with it.
* rpc requests are always made via a pool.
*/
struct evrpc_pool *evrpc_pool_new();
void evrpc_pool_free(struct evrpc_pool *);
void evrpc_pool_add_connection(struct evrpc_pool *,
struct evhttp_connection *);
#endif /* _EVRPC_H_ */

View File

@ -65,11 +65,14 @@ struct evhttp_cb {
void *cbarg;
};
/* both the http server as well as the rpc system need to queue connections */
TAILQ_HEAD(evconq, evhttp_connection);
struct evhttp {
struct event bind_ev;
TAILQ_HEAD(httpcbq, evhttp_cb) callbacks;
TAILQ_HEAD(evconq, evhttp_connection) connections;
struct evconq connections;
void (*gencb)(struct evhttp_request *req, void *);
void *gencbarg;

View File

@ -84,7 +84,8 @@ http_setup(short *pport)
return (myhttp);
}
EVRPC_DEFINE(Message, msg, kill);
EVRPC_HEADER(Message, msg, kill);
EVRPC_GENERATE(Message, msg, kill);
void
MessageCB(EVRPC_STRUCT(Message)* rpc, void *arg)