finish RPC client support

svn:r269
This commit is contained in:
Niels Provos 2006-11-20 07:44:37 +00:00
parent fda1216b6e
commit ff43ed5b33
2 changed files with 146 additions and 8 deletions

54
evrpc.c
View File

@ -243,6 +243,13 @@ evrpc_pool_new()
return (pool);
}
static void
evrpc_request_wrapper_free(struct evrpc_request_wrapper *request)
{
free(request->name);
free(request);
}
void
evrpc_pool_free(struct evrpc_pool *pool)
{
@ -252,8 +259,7 @@ evrpc_pool_free(struct evrpc_pool *pool)
while ((request = TAILQ_FIRST(&pool->requests)) != NULL) {
TAILQ_REMOVE(&pool->requests, request, next);
/* if this gets more complicated we need our own function */
free(request->name);
free(request);
evrpc_request_wrapper_free(request);
}
while ((connection = TAILQ_FIRST(&pool->connections)) != NULL) {
@ -264,6 +270,11 @@ evrpc_pool_free(struct evrpc_pool *pool)
free(pool);
}
/*
* Add a connection to the RPC pool. A request scheduled on the pool
* may use any available connection.
*/
void
evrpc_pool_add_connection(struct evrpc_pool *pool,
struct evhttp_connection *connection) {
@ -271,7 +282,7 @@ evrpc_pool_add_connection(struct evrpc_pool *pool,
TAILQ_INSERT_TAIL(&pool->connections, connection, next);
/*
* if we have any requests, pending schedule them with the new
* if we have any requests pending, schedule them with the new
* connections.
*/
@ -309,19 +320,32 @@ static int
evrpc_schedule_request(struct evhttp_connection *connection,
struct evrpc_request_wrapper *ctx)
{
struct evbuffer *output;
struct evhttp_request *req;
if ((output = evbuffer_new()) == NULL)
goto error;
struct evhttp_request *req = NULL;
char *uri = NULL;
int res = 0;
if ((req = evhttp_request_new(evrpc_reply_done, ctx)) == NULL)
goto error;
/* serialize the request data into the output buffer */
ctx->request_marshal(req->output_buffer, ctx->request);
uri = evrpc_construct_uri(ctx->name);
if (uri == NULL)
goto error;
/* start the request over the connection */
res = evhttp_make_request(connection, req, EVHTTP_REQ_POST, uri);
free(uri);
if (res == -1)
goto error;
return (0);
error:
(*ctx->cb)(ctx->request, ctx->reply, ctx->cb_arg);
free(ctx);
evrpc_request_wrapper_free(ctx);
return (-1);
}
@ -354,4 +378,18 @@ static void
evrpc_reply_done(struct evhttp_request *req, void *arg)
{
struct evrpc_request_wrapper *ctx = arg;
int res;
/* we need to get the reply now */
res = ctx->reply_unmarshal(ctx->reply, req->input_buffer);
if (res == -1) {
/* clear everything that we might have written previously */
ctx->reply_clear(ctx->reply);
}
(*ctx->cb)(ctx->request, ctx->reply, ctx->cb_arg);
evrpc_request_wrapper_free(ctx);
/* the http layer owns the request structure */
}

View File

@ -51,6 +51,7 @@
#include <stdio.h>
#include <string.h>
#include <errno.h>
#include <assert.h>
#include "event.h"
#include "evhttp.h"
@ -275,9 +276,108 @@ rpc_basic_message(void)
evhttp_free(http);
}
static struct evrpc_pool *
rpc_pool_with_connection(short port)
{
struct evhttp_connection *evcon;
struct evrpc_pool *pool;
pool = evrpc_pool_new();
assert(pool != NULL);
evcon = evhttp_connection_new("127.0.0.1", port);
assert(evcon != NULL);
evrpc_pool_add_connection(pool, evcon);
return (pool);
}
static void
GotKillCb(struct msg *msg, struct kill *kill, void *arg)
{
char *weapon;
char *action;
if (EVTAG_GET(kill, weapon, &weapon) == -1) {
fprintf(stderr, "get weapon\n");
goto done;
}
if (EVTAG_GET(kill, action, &action) == -1) {
fprintf(stderr, "get action\n");
goto done;
}
if (strcmp(weapon, "dagger"))
goto done;
if (strcmp(action, "wave around like an idiot"))
goto done;
test_ok += 1;
done:
event_loopexit(NULL);
}
static void
rpc_basic_client(void)
{
short port;
struct evhttp *http = NULL;
struct evrpc_base *base = NULL;
struct evrpc_pool *pool = NULL;
struct msg *msg;
struct kill *kill;
fprintf(stdout, "Testing RPC Client: ");
rpc_setup(&http, &port, &base);
pool = rpc_pool_with_connection(port);
/* set up the basic message */
msg = msg_new();
EVTAG_ASSIGN(msg, from_name, "niels");
EVTAG_ASSIGN(msg, to_name, "tester");
kill = kill_new();
EVRPC_MAKE_REQUEST(Message, msg, kill, GotKillCb, NULL);
test_ok = 0;
event_dispatch();
if (test_ok != 1) {
fprintf(stdout, "FAILED (1)\n");
exit(1);
}
/* we do it twice to make sure that reuse works correctly */
kill_clear(kill);
EVRPC_MAKE_REQUEST(Message, msg, kill, GotKillCb, NULL);
event_dispatch();
if (test_ok != 2) {
fprintf(stdout, "FAILED (2)\n");
exit(1);
}
fprintf(stdout, "OK\n");
msg_free(msg);
kill_free(kill);
evrpc_pool_free(pool);
evhttp_free(http);
}
void
rpc_suite(void)
{
rpc_basic_test();
rpc_basic_message();
rpc_basic_client();
}