/* * Copyright (c) 2009 Niels Provos and Nick Mathewson * * All rights reserved. * * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions * are met: * 1. Redistributions of source code must retain the above copyright * notice, this list of conditions and the following disclaimer. * 2. Redistributions in binary form must reproduce the above copyright * notice, this list of conditions and the following disclaimer in the * documentation and/or other materials provided with the distribution. * 3. The name of the author may not be used to endorse or promote products * derived from this software without specific prior written permission. * * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. * IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. */ #include "event-config.h" #ifdef _EVENT_HAVE_SYS_TIME_H #include #endif #include #include #include #include #ifdef _EVENT_HAVE_STDARG_H #include #endif #ifdef _EVENT_HAVE_UNISTD_H #include #endif #ifdef WIN32 #include #endif #include "event2/util.h" #include "event2/bufferevent.h" #include "event2/buffer.h" #include "event2/bufferevent_struct.h" #include "event2/event.h" #include "log-internal.h" #include "mm-internal.h" #include "bufferevent-internal.h" #include "util-internal.h" #include "iocp-internal.h" /* prototypes */ static int be_async_enable(struct bufferevent *, short); static int be_async_disable(struct bufferevent *, short); static void be_async_destruct(struct bufferevent *); static int be_async_flush(struct bufferevent *, short, enum bufferevent_flush_mode); static int be_async_ctrl(struct bufferevent *, enum bufferevent_ctrl_op, union bufferevent_ctrl_data *); const struct bufferevent_ops bufferevent_ops_async = { "socket_async", 0, be_async_enable, be_async_disable, be_async_destruct, _bufferevent_generic_adj_timeouts, be_async_flush, be_async_ctrl, }; struct bufferevent_async { struct bufferevent_private bev; unsigned read_in_progress : 1; unsigned write_in_progress : 1; }; static inline struct bufferevent_async * upcast(struct bufferevent *bev) { struct bufferevent_async *bev_a; if (bev->be_ops != &bufferevent_ops_async) return NULL; bev_a = EVUTIL_UPCAST(bev, struct bufferevent_async, bev.bev); EVUTIL_ASSERT(bev_a->bev.bev.be_ops == &bufferevent_ops_async); return bev_a; } static void bev_async_consider_writing(struct bufferevent_async *b) { /* Don't write if there's a write in progress, or we do not * want to write. */ if (b->write_in_progress || !(b->bev.bev.enabled&EV_WRITE)) return; /* Don't write if there's nothing to write */ if (!evbuffer_get_length(b->bev.bev.output)) return; /* XXXX doesn't respect low-water mark very well. */ if (evbuffer_launch_write(b->bev.bev.output, -1)) { EVUTIL_ASSERT(0);/* XXX act sensibly. */ } else { b->write_in_progress = 1; } } static void bev_async_consider_reading(struct bufferevent_async *b) { size_t cur_size; size_t read_high; size_t at_most; /* Don't read if there is a read in progress, or we do not * want to read. */ if (b->read_in_progress || !(b->bev.bev.enabled&EV_READ)) return; /* Don't read if we're full */ cur_size = evbuffer_get_length(b->bev.bev.input); read_high = b->bev.bev.wm_read.high; if (read_high) { if (cur_size >= read_high) return; at_most = read_high - cur_size; } else { at_most = 16384; /* FIXME totally magic. */ } if (evbuffer_launch_read(b->bev.bev.input, at_most)) { EVUTIL_ASSERT(0); } else { b->read_in_progress = 1; } } static void be_async_outbuf_callback(struct evbuffer *buf, const struct evbuffer_cb_info *cbinfo, void *arg) { struct bufferevent *bev = arg; struct bufferevent_async *bev_async = upcast(bev); /* If we successfully wrote from the outbuf, or we added data to the * outbuf and were not writing before, we may want to write now. */ _bufferevent_incref_and_lock(bev); if (cbinfo->n_deleted) { /* XXXX can't detect 0-length write completion */ bev_async->write_in_progress = 0; } if (cbinfo->n_added || cbinfo->n_deleted) bev_async_consider_writing(bev_async); if (cbinfo->n_deleted) { BEV_RESET_GENERIC_WRITE_TIMEOUT(bev); if (bev->writecb != NULL && evbuffer_get_length(bev->output) <= bev->wm_write.low) _bufferevent_run_writecb(bev); } _bufferevent_decref_and_unlock(bev); } static void be_async_inbuf_callback(struct evbuffer *buf, const struct evbuffer_cb_info *cbinfo, void *arg) { struct bufferevent *bev = arg; struct bufferevent_async *bev_async = upcast(bev); /* If we successfully read into the inbuf, or we drained data from * the inbuf and were not reading before, we may want to read now */ _bufferevent_incref_and_lock(bev); if (cbinfo->n_added) { /* XXXX can't detect 0-length read completion */ bev_async->read_in_progress = 0; } if (cbinfo->n_added || cbinfo->n_deleted) bev_async_consider_reading(bev_async); if (cbinfo->n_added) { BEV_RESET_GENERIC_READ_TIMEOUT(bev); if (evbuffer_get_length(bev->input) >= bev->wm_read.low && bev->readcb != NULL) _bufferevent_run_readcb(bev); } _bufferevent_decref_and_unlock(bev); } static int be_async_enable(struct bufferevent *buf, short what) { struct bufferevent_async *bev_async = upcast(buf); _bufferevent_generic_adj_timeouts(buf); /* If we newly enable reading or writing, and we aren't reading or writing already, consider launching a new read or write. */ if (what & EV_READ) bev_async_consider_reading(bev_async); if (what & EV_WRITE) bev_async_consider_writing(bev_async); return 0; } static int be_async_disable(struct bufferevent *bev, short what) { /* XXXX If we disable reading or writing, we may want to consider * canceling any in-progress read or write operation, though it might * not work. */ _bufferevent_generic_adj_timeouts(bev); return 0; } static void be_async_destruct(struct bufferevent *bev) { _bufferevent_del_generic_timeout_cbs(bev); } static int be_async_flush(struct bufferevent *bev, short what, enum bufferevent_flush_mode mode) { return 0; } struct bufferevent * bufferevent_async_new(struct event_base *base, evutil_socket_t fd, int options) { struct bufferevent_async *bev_a; struct bufferevent *bev; struct event_iocp_port *iocp; options |= BEV_OPT_THREADSAFE; if (!(iocp = event_base_get_iocp(base))) return NULL; if (event_iocp_port_associate(iocp, fd, 1)<0) return NULL; if (!(bev_a = mm_calloc(1, sizeof(struct bufferevent_async)))) return NULL; bev = &bev_a->bev.bev; if (!(bev->input = evbuffer_overlapped_new(fd))) { mm_free(bev_a); return NULL; } if (!(bev->output = evbuffer_overlapped_new(fd))) { evbuffer_free(bev->input); mm_free(bev_a); return NULL; } if (bufferevent_init_common(&bev_a->bev, base, &bufferevent_ops_async, options)<0) goto err; evbuffer_add_cb(bev->input, be_async_inbuf_callback, bev); evbuffer_add_cb(bev->output, be_async_outbuf_callback, bev); evbuffer_defer_callbacks(bev->input, base); evbuffer_defer_callbacks(bev->output, base); _bufferevent_init_generic_timeout_cbs(&bev_a->bev.bev); return bev; err: bufferevent_free(&bev_a->bev.bev); return NULL; } static int be_async_ctrl(struct bufferevent *bev, enum bufferevent_ctrl_op op, union bufferevent_ctrl_data *data) { switch (op) { case BEV_CTRL_GET_FD: data->fd = _evbuffer_overlapped_get_fd(bev->input); return 0; case BEV_CTRL_SET_FD: case BEV_CTRL_GET_UNDERLYING: default: return -1; } }