Support temporarily suspending an evbuffer callback. This is different from disabling the callback, since we want to process changes, but not just yet.

svn:r1091
This commit is contained in:
Nick Mathewson 2009-02-01 01:43:58 +00:00
parent e3e1153109
commit 8d3a10f8f1
4 changed files with 87 additions and 6 deletions

View File

@ -107,6 +107,16 @@ static int use_sendfile = 1;
static int use_mmap = 1;
#endif
/* Mask of user-selectable callback flags. */
#define EVBUFFER_CB_USER_FLAGS 0xffff
/* Mask of all internal-use-only flags. */
#define EVBUFFER_CB_INTERNAL_FLAGS 0xffff0000
/* Flag set on suspended callbacks. */
#define EVBUFFER_CB_SUSPENDED 0x00010000
/* Flag set if we should invoke the callback on wakeup. */
#define EVBUFFER_CB_CALL_ON_UNSUSPEND 0x00020000
/* evbuffer_chain support */
#define CHAIN_SPACE_PTR(ch) ((ch)->buffer + (ch)->misalign + (ch)->off)
#define CHAIN_SPACE_LEN(ch) ((ch)->flags & EVBUFFER_IMMUTABLE ? \
@ -229,8 +239,13 @@ evbuffer_invoke_callbacks(struct evbuffer *buffer, size_t old_size)
/* Get the 'next' pointer now in case this callback decides
* to remove itself or something. */
next = TAILQ_NEXT(cbent, next);
if ((cbent->flags & EVBUFFER_CB_ENABLED))
cbent->cb(buffer, old_size, new_size, cbent->cbarg);
if ((cbent->flags & EVBUFFER_CB_ENABLED)) {
if ((cbent->flags & EVBUFFER_CB_SUSPENDED))
cbent->flags |= EVBUFFER_CB_CALL_ON_UNSUSPEND;
else
cbent->cb(buffer,
old_size, new_size, cbent->cbarg);
}
}
}
@ -1578,9 +1593,33 @@ evbuffer_remove_cb(struct evbuffer *buffer, evbuffer_cb cb, void *cbarg)
int
evbuffer_cb_set_flags(struct evbuffer *buffer,
struct evbuffer_cb_entry *cb, unsigned flags)
struct evbuffer_cb_entry *cb, ev_uint32_t flags)
{
(void)buffer; /* unused */
cb->flags = flags;
cb->flags = (cb->flags & EVBUFFER_CB_INTERNAL_FLAGS) | flags;
return 0;
}
void
evbuffer_cb_suspend(struct evbuffer *buffer, struct evbuffer_cb_entry *cb)
{
if (!(cb->flags & EVBUFFER_CB_SUSPENDED)) {
cb->size_before_suspend = EVBUFFER_LENGTH(buffer);
cb->flags |= EVBUFFER_CB_SUSPENDED;
}
}
void
evbuffer_cb_unsuspend(struct evbuffer *buffer, struct evbuffer_cb_entry *cb)
{
if ((cb->flags & EVBUFFER_CB_SUSPENDED)) {
unsigned call = (cb->flags & EVBUFFER_CB_CALL_ON_UNSUSPEND);
size_t sz = cb->size_before_suspend;
cb->flags &= ~(EVBUFFER_CB_SUSPENDED|
EVBUFFER_CB_CALL_ON_UNSUSPEND);
cb->size_before_suspend = 0;
if (call && (cb->flags & EVBUFFER_CB_ENABLED)) {
cb->cb(buffer, sz, EVBUFFER_LENGTH(buffer), cb->cbarg);
}
}
}

View File

@ -42,7 +42,8 @@ struct evbuffer_cb_entry {
TAILQ_ENTRY(evbuffer_cb_entry) next;
evbuffer_cb cb;
void *cbarg;
unsigned flags;
ev_uint32_t flags;
size_t size_before_suspend;
};
struct evbuffer_chain;

View File

@ -424,7 +424,28 @@ int evbuffer_remove_cb(struct evbuffer *buffer, evbuffer_cb cb, void *cbarg);
@return 0 on success, -1 on failure.
*/
int evbuffer_cb_set_flags(struct evbuffer *buffer,
struct evbuffer_cb_entry *cb, unsigned flags);
struct evbuffer_cb_entry *cb, ev_uint32_t flags);
/** Postpone calling a given callback until unsuspend is called later.
This is different from disabling the callback, since the callback will get
invoked later if the buffer size changes between now and when we unsuspend
it.
@param the buffer that the callback is watching.
@param cb the callback we want to suspend.
*/
void evbuffer_cb_suspend(struct evbuffer *buffer, struct evbuffer_cb_entry *cb);
/** Stop postponing a callback that we posponed with evbuffer_cb_suspend.
If data was added to or removed from the buffer while the callback was
suspended, the callback will get called once now.
@param the buffer that the callback is watching.
@param cb the callback we want to stop suspending.
*/
void evbuffer_cb_unsuspend(struct evbuffer *buffer, struct evbuffer_cb_entry *cb);
/**
Makes the data at the begging of an evbuffer contiguous.

View File

@ -569,6 +569,26 @@ test_evbuffer_callbacks(void *ptr)
evbuffer_add_printf(buf, "This will not.");
tt_str_op(evbuffer_pullup(buf, -1), ==, "This will not.");
evbuffer_drain(buf, EVBUFFER_LENGTH(buf));
/* Now let's try a suspended callback. */
cb1 = evbuffer_add_cb(buf, log_change_callback, buf_out1);
cb2 = evbuffer_add_cb(buf, log_change_callback, buf_out2);
evbuffer_cb_suspend(buf,cb2);
evbuffer_prepend(buf,"Hello world",11); /*0->11*/
evbuffer_validate(buf);
evbuffer_cb_suspend(buf,cb1);
evbuffer_add(buf,"more",4); /* 11->15 */
evbuffer_cb_unsuspend(buf,cb2);
evbuffer_drain(buf, 4); /* 15->11 */
evbuffer_cb_unsuspend(buf,cb1);
evbuffer_drain(buf, EVBUFFER_LENGTH(buf)); /* 11->0 */
tt_str_op(evbuffer_pullup(buf_out1, -1), ==,
"0->11; 11->11; 11->0; ");
tt_str_op(evbuffer_pullup(buf_out2, -1), ==,
"0->15; 15->11; 11->0; ");
end:
if (buf)
evbuffer_free(buf);