libevent/bufferevent.c
Nick Mathewson 8ac3c4c25b Have all visible internal function names end with an underscore.
We haven't had a convention for naming internal functions in -internal.h
versus naming visible functions in include/**.h.  This patch changes every
function declared in a -internal.h file to be named ending with an
underscore.

Static function names are unaffected, since there's no risk of calling them
from outside Libevent.

This is an automatic conversion.  The script that produced was made by
running the following script over the output of

ctags --c-kinds=pf -o - *-internal.h |  cut -f 1 | sort| uniq

(GNU ctags was required.)

=====
#!/usr/bin/perl -w -n

use strict;

BEGIN { print "#!/usr/bin/perl -w -i -p\n\n"; }

chomp;

my $ident = $_;

next if ($ident =~ /_$/);
next if ($ident =~ /^TAILQ/);

my $better = "${ident}_";

print "s/(?<![A-Za-z0-9_])$ident(?![A-Za-z0-9_])/$better/g;\n";

=== And then running the script below that it generated over all
=== the .c and .h files again
#!/usr/bin/perl -w -i -p

s/(?<![A-Za-z0-9_])bufferevent_async_can_connect(?![A-Za-z0-9_])/bufferevent_async_can_connect_/g;
s/(?<![A-Za-z0-9_])bufferevent_async_connect(?![A-Za-z0-9_])/bufferevent_async_connect_/g;
s/(?<![A-Za-z0-9_])bufferevent_async_new(?![A-Za-z0-9_])/bufferevent_async_new_/g;
s/(?<![A-Za-z0-9_])bufferevent_async_set_connected(?![A-Za-z0-9_])/bufferevent_async_set_connected_/g;
s/(?<![A-Za-z0-9_])bufferevent_decref(?![A-Za-z0-9_])/bufferevent_decref_/g;
s/(?<![A-Za-z0-9_])bufferevent_disable_hard(?![A-Za-z0-9_])/bufferevent_disable_hard_/g;
s/(?<![A-Za-z0-9_])bufferevent_enable_locking(?![A-Za-z0-9_])/bufferevent_enable_locking_/g;
s/(?<![A-Za-z0-9_])bufferevent_incref(?![A-Za-z0-9_])/bufferevent_incref_/g;
s/(?<![A-Za-z0-9_])bufferevent_init_common(?![A-Za-z0-9_])/bufferevent_init_common_/g;
s/(?<![A-Za-z0-9_])bufferevent_remove_from_rate_limit_group_internal(?![A-Za-z0-9_])/bufferevent_remove_from_rate_limit_group_internal_/g;
s/(?<![A-Za-z0-9_])bufferevent_suspend_read(?![A-Za-z0-9_])/bufferevent_suspend_read_/g;
s/(?<![A-Za-z0-9_])bufferevent_suspend_write(?![A-Za-z0-9_])/bufferevent_suspend_write_/g;
s/(?<![A-Za-z0-9_])bufferevent_unsuspend_read(?![A-Za-z0-9_])/bufferevent_unsuspend_read_/g;
s/(?<![A-Za-z0-9_])bufferevent_unsuspend_write(?![A-Za-z0-9_])/bufferevent_unsuspend_write_/g;
s/(?<![A-Za-z0-9_])evbuffer_commit_read(?![A-Za-z0-9_])/evbuffer_commit_read_/g;
s/(?<![A-Za-z0-9_])evbuffer_commit_write(?![A-Za-z0-9_])/evbuffer_commit_write_/g;
s/(?<![A-Za-z0-9_])evbuffer_invoke_callbacks(?![A-Za-z0-9_])/evbuffer_invoke_callbacks_/g;
s/(?<![A-Za-z0-9_])evbuffer_launch_read(?![A-Za-z0-9_])/evbuffer_launch_read_/g;
s/(?<![A-Za-z0-9_])evbuffer_launch_write(?![A-Za-z0-9_])/evbuffer_launch_write_/g;
s/(?<![A-Za-z0-9_])evbuffer_overlapped_new(?![A-Za-z0-9_])/evbuffer_overlapped_new_/g;
s/(?<![A-Za-z0-9_])evbuffer_set_parent(?![A-Za-z0-9_])/evbuffer_set_parent_/g;
s/(?<![A-Za-z0-9_])event_active_nolock(?![A-Za-z0-9_])/event_active_nolock_/g;
s/(?<![A-Za-z0-9_])event_base_add_virtual(?![A-Za-z0-9_])/event_base_add_virtual_/g;
s/(?<![A-Za-z0-9_])event_base_assert_ok(?![A-Za-z0-9_])/event_base_assert_ok_/g;
s/(?<![A-Za-z0-9_])event_base_del_virtual(?![A-Za-z0-9_])/event_base_del_virtual_/g;
s/(?<![A-Za-z0-9_])event_base_get_deferred_cb_queue(?![A-Za-z0-9_])/event_base_get_deferred_cb_queue_/g;
s/(?<![A-Za-z0-9_])event_base_get_iocp(?![A-Za-z0-9_])/event_base_get_iocp_/g;
s/(?<![A-Za-z0-9_])event_base_start_iocp(?![A-Za-z0-9_])/event_base_start_iocp_/g;
s/(?<![A-Za-z0-9_])event_base_stop_iocp(?![A-Za-z0-9_])/event_base_stop_iocp_/g;
s/(?<![A-Za-z0-9_])event_changelist_add(?![A-Za-z0-9_])/event_changelist_add_/g;
s/(?<![A-Za-z0-9_])event_changelist_del(?![A-Za-z0-9_])/event_changelist_del_/g;
s/(?<![A-Za-z0-9_])event_changelist_freemem(?![A-Za-z0-9_])/event_changelist_freemem_/g;
s/(?<![A-Za-z0-9_])event_changelist_init(?![A-Za-z0-9_])/event_changelist_init_/g;
s/(?<![A-Za-z0-9_])event_changelist_remove_all(?![A-Za-z0-9_])/event_changelist_remove_all_/g;
s/(?<![A-Za-z0-9_])event_deferred_cb_cancel(?![A-Za-z0-9_])/event_deferred_cb_cancel_/g;
s/(?<![A-Za-z0-9_])event_deferred_cb_init(?![A-Za-z0-9_])/event_deferred_cb_init_/g;
s/(?<![A-Za-z0-9_])event_deferred_cb_queue_init(?![A-Za-z0-9_])/event_deferred_cb_queue_init_/g;
s/(?<![A-Za-z0-9_])event_deferred_cb_schedule(?![A-Za-z0-9_])/event_deferred_cb_schedule_/g;
s/(?<![A-Za-z0-9_])event_get_win32_extension_fns(?![A-Za-z0-9_])/event_get_win32_extension_fns_/g;
s/(?<![A-Za-z0-9_])event_iocp_activate_overlapped(?![A-Za-z0-9_])/event_iocp_activate_overlapped_/g;
s/(?<![A-Za-z0-9_])event_iocp_port_associate(?![A-Za-z0-9_])/event_iocp_port_associate_/g;
s/(?<![A-Za-z0-9_])event_iocp_port_launch(?![A-Za-z0-9_])/event_iocp_port_launch_/g;
s/(?<![A-Za-z0-9_])event_iocp_shutdown(?![A-Za-z0-9_])/event_iocp_shutdown_/g;
s/(?<![A-Za-z0-9_])event_overlapped_init(?![A-Za-z0-9_])/event_overlapped_init_/g;
s/(?<![A-Za-z0-9_])evhttp_connection_connect(?![A-Za-z0-9_])/evhttp_connection_connect_/g;
s/(?<![A-Za-z0-9_])evhttp_connection_fail(?![A-Za-z0-9_])/evhttp_connection_fail_/g;
s/(?<![A-Za-z0-9_])evhttp_connection_reset(?![A-Za-z0-9_])/evhttp_connection_reset_/g;
s/(?<![A-Za-z0-9_])evhttp_parse_firstline(?![A-Za-z0-9_])/evhttp_parse_firstline_/g;
s/(?<![A-Za-z0-9_])evhttp_parse_headers(?![A-Za-z0-9_])/evhttp_parse_headers_/g;
s/(?<![A-Za-z0-9_])evhttp_response_code(?![A-Za-z0-9_])/evhttp_response_code_/g;
s/(?<![A-Za-z0-9_])evhttp_send_page(?![A-Za-z0-9_])/evhttp_send_page_/g;
s/(?<![A-Za-z0-9_])evhttp_start_read(?![A-Za-z0-9_])/evhttp_start_read_/g;
s/(?<![A-Za-z0-9_])EVLOCK_TRY_LOCK(?![A-Za-z0-9_])/EVLOCK_TRY_LOCK_/g;
s/(?<![A-Za-z0-9_])evmap_check_integrity(?![A-Za-z0-9_])/evmap_check_integrity_/g;
s/(?<![A-Za-z0-9_])evmap_delete_all(?![A-Za-z0-9_])/evmap_delete_all_/g;
s/(?<![A-Za-z0-9_])evmap_foreach_event(?![A-Za-z0-9_])/evmap_foreach_event_/g;
s/(?<![A-Za-z0-9_])evmap_io_active(?![A-Za-z0-9_])/evmap_io_active_/g;
s/(?<![A-Za-z0-9_])evmap_io_add(?![A-Za-z0-9_])/evmap_io_add_/g;
s/(?<![A-Za-z0-9_])evmap_io_clear(?![A-Za-z0-9_])/evmap_io_clear_/g;
s/(?<![A-Za-z0-9_])evmap_io_del(?![A-Za-z0-9_])/evmap_io_del_/g;
s/(?<![A-Za-z0-9_])evmap_io_get_fdinfo(?![A-Za-z0-9_])/evmap_io_get_fdinfo_/g;
s/(?<![A-Za-z0-9_])evmap_io_initmap(?![A-Za-z0-9_])/evmap_io_initmap_/g;
s/(?<![A-Za-z0-9_])evmap_reinit(?![A-Za-z0-9_])/evmap_reinit_/g;
s/(?<![A-Za-z0-9_])evmap_signal_active(?![A-Za-z0-9_])/evmap_signal_active_/g;
s/(?<![A-Za-z0-9_])evmap_signal_add(?![A-Za-z0-9_])/evmap_signal_add_/g;
s/(?<![A-Za-z0-9_])evmap_signal_clear(?![A-Za-z0-9_])/evmap_signal_clear_/g;
s/(?<![A-Za-z0-9_])evmap_signal_del(?![A-Za-z0-9_])/evmap_signal_del_/g;
s/(?<![A-Za-z0-9_])evmap_signal_initmap(?![A-Za-z0-9_])/evmap_signal_initmap_/g;
s/(?<![A-Za-z0-9_])evrpc_hook_associate_meta(?![A-Za-z0-9_])/evrpc_hook_associate_meta_/g;
s/(?<![A-Za-z0-9_])evrpc_hook_context_free(?![A-Za-z0-9_])/evrpc_hook_context_free_/g;
s/(?<![A-Za-z0-9_])evrpc_hook_meta_new(?![A-Za-z0-9_])/evrpc_hook_meta_new_/g;
s/(?<![A-Za-z0-9_])evrpc_reqstate_free(?![A-Za-z0-9_])/evrpc_reqstate_free_/g;
s/(?<![A-Za-z0-9_])evsig_dealloc(?![A-Za-z0-9_])/evsig_dealloc_/g;
s/(?<![A-Za-z0-9_])evsig_init(?![A-Za-z0-9_])/evsig_init_/g;
s/(?<![A-Za-z0-9_])evsig_set_base(?![A-Za-z0-9_])/evsig_set_base_/g;
s/(?<![A-Za-z0-9_])ev_token_bucket_get_tick(?![A-Za-z0-9_])/ev_token_bucket_get_tick_/g;
s/(?<![A-Za-z0-9_])ev_token_bucket_init(?![A-Za-z0-9_])/ev_token_bucket_init_/g;
s/(?<![A-Za-z0-9_])ev_token_bucket_update(?![A-Za-z0-9_])/ev_token_bucket_update_/g;
s/(?<![A-Za-z0-9_])evutil_accept4(?![A-Za-z0-9_])/evutil_accept4_/g;
s/(?<![A-Za-z0-9_])evutil_addrinfo_append(?![A-Za-z0-9_])/evutil_addrinfo_append_/g;
s/(?<![A-Za-z0-9_])evutil_adjust_hints_for_addrconfig(?![A-Za-z0-9_])/evutil_adjust_hints_for_addrconfig_/g;
s/(?<![A-Za-z0-9_])evutil_ersatz_socketpair(?![A-Za-z0-9_])/evutil_ersatz_socketpair_/g;
s/(?<![A-Za-z0-9_])evutil_eventfd(?![A-Za-z0-9_])/evutil_eventfd_/g;
s/(?<![A-Za-z0-9_])evutil_format_sockaddr_port(?![A-Za-z0-9_])/evutil_format_sockaddr_port_/g;
s/(?<![A-Za-z0-9_])evutil_getaddrinfo_async(?![A-Za-z0-9_])/evutil_getaddrinfo_async_/g;
s/(?<![A-Za-z0-9_])evutil_getaddrinfo_common(?![A-Za-z0-9_])/evutil_getaddrinfo_common_/g;
s/(?<![A-Za-z0-9_])evutil_getenv(?![A-Za-z0-9_])/evutil_getenv_/g;
s/(?<![A-Za-z0-9_])evutil_hex_char_to_int(?![A-Za-z0-9_])/evutil_hex_char_to_int_/g;
s/(?<![A-Za-z0-9_])EVUTIL_ISALNUM(?![A-Za-z0-9_])/EVUTIL_ISALNUM_/g;
s/(?<![A-Za-z0-9_])EVUTIL_ISALPHA(?![A-Za-z0-9_])/EVUTIL_ISALPHA_/g;
s/(?<![A-Za-z0-9_])EVUTIL_ISDIGIT(?![A-Za-z0-9_])/EVUTIL_ISDIGIT_/g;
s/(?<![A-Za-z0-9_])EVUTIL_ISLOWER(?![A-Za-z0-9_])/EVUTIL_ISLOWER_/g;
s/(?<![A-Za-z0-9_])EVUTIL_ISPRINT(?![A-Za-z0-9_])/EVUTIL_ISPRINT_/g;
s/(?<![A-Za-z0-9_])EVUTIL_ISSPACE(?![A-Za-z0-9_])/EVUTIL_ISSPACE_/g;
s/(?<![A-Za-z0-9_])EVUTIL_ISUPPER(?![A-Za-z0-9_])/EVUTIL_ISUPPER_/g;
s/(?<![A-Za-z0-9_])EVUTIL_ISXDIGIT(?![A-Za-z0-9_])/EVUTIL_ISXDIGIT_/g;
s/(?<![A-Za-z0-9_])evutil_load_windows_system_library(?![A-Za-z0-9_])/evutil_load_windows_system_library_/g;
s/(?<![A-Za-z0-9_])evutil_make_internal_pipe(?![A-Za-z0-9_])/evutil_make_internal_pipe_/g;
s/(?<![A-Za-z0-9_])evutil_new_addrinfo(?![A-Za-z0-9_])/evutil_new_addrinfo_/g;
s/(?<![A-Za-z0-9_])evutil_open_closeonexec(?![A-Za-z0-9_])/evutil_open_closeonexec_/g;
s/(?<![A-Za-z0-9_])evutil_read_file(?![A-Za-z0-9_])/evutil_read_file_/g;
s/(?<![A-Za-z0-9_])evutil_resolve(?![A-Za-z0-9_])/evutil_resolve_/g;
s/(?<![A-Za-z0-9_])evutil_set_evdns_getaddrinfo_fn(?![A-Za-z0-9_])/evutil_set_evdns_getaddrinfo_fn_/g;
s/(?<![A-Za-z0-9_])evutil_sockaddr_is_loopback(?![A-Za-z0-9_])/evutil_sockaddr_is_loopback_/g;
s/(?<![A-Za-z0-9_])evutil_socket(?![A-Za-z0-9_])/evutil_socket_/g;
s/(?<![A-Za-z0-9_])evutil_socket_connect(?![A-Za-z0-9_])/evutil_socket_connect_/g;
s/(?<![A-Za-z0-9_])evutil_socket_finished_connecting(?![A-Za-z0-9_])/evutil_socket_finished_connecting_/g;
s/(?<![A-Za-z0-9_])EVUTIL_TOLOWER(?![A-Za-z0-9_])/EVUTIL_TOLOWER_/g;
s/(?<![A-Za-z0-9_])EVUTIL_TOUPPER(?![A-Za-z0-9_])/EVUTIL_TOUPPER_/g;
s/(?<![A-Za-z0-9_])evutil_tv_to_msec(?![A-Za-z0-9_])/evutil_tv_to_msec_/g;
s/(?<![A-Za-z0-9_])evutil_usleep(?![A-Za-z0-9_])/evutil_usleep_/g;
s/(?<![A-Za-z0-9_])ht_improve_hash(?![A-Za-z0-9_])/ht_improve_hash_/g;
s/(?<![A-Za-z0-9_])ht_string_hash(?![A-Za-z0-9_])/ht_string_hash_/g;
s/(?<![A-Za-z0-9_])min_heap_adjust(?![A-Za-z0-9_])/min_heap_adjust_/g;
s/(?<![A-Za-z0-9_])min_heap_ctor(?![A-Za-z0-9_])/min_heap_ctor_/g;
s/(?<![A-Za-z0-9_])min_heap_dtor(?![A-Za-z0-9_])/min_heap_dtor_/g;
s/(?<![A-Za-z0-9_])min_heap_elem_init(?![A-Za-z0-9_])/min_heap_elem_init_/g;
s/(?<![A-Za-z0-9_])min_heap_elt_is_top(?![A-Za-z0-9_])/min_heap_elt_is_top_/g;
s/(?<![A-Za-z0-9_])min_heap_empty(?![A-Za-z0-9_])/min_heap_empty_/g;
s/(?<![A-Za-z0-9_])min_heap_erase(?![A-Za-z0-9_])/min_heap_erase_/g;
s/(?<![A-Za-z0-9_])min_heap_pop(?![A-Za-z0-9_])/min_heap_pop_/g;
s/(?<![A-Za-z0-9_])min_heap_push(?![A-Za-z0-9_])/min_heap_push_/g;
s/(?<![A-Za-z0-9_])min_heap_reserve(?![A-Za-z0-9_])/min_heap_reserve_/g;
s/(?<![A-Za-z0-9_])min_heap_size(?![A-Za-z0-9_])/min_heap_size_/g;
s/(?<![A-Za-z0-9_])min_heap_top(?![A-Za-z0-9_])/min_heap_top_/g;
2012-02-29 15:07:33 -05:00

899 lines
22 KiB
C

/*
* Copyright (c) 2002-2007 Niels Provos <provos@citi.umich.edu>
* Copyright (c) 2007-2012 Niels Provos, Nick Mathewson
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions
* are met:
* 1. Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* 2. Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in the
* documentation and/or other materials provided with the distribution.
* 3. The name of the author may not be used to endorse or promote products
* derived from this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR
* IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
* OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
* IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT,
* INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
* NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF
* THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/
#include "event2/event-config.h"
#include "evconfig-private.h"
#include <sys/types.h>
#ifdef EVENT__HAVE_SYS_TIME_H
#include <sys/time.h>
#endif
#include <errno.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#ifdef EVENT__HAVE_STDARG_H
#include <stdarg.h>
#endif
#ifdef _WIN32
#include <winsock2.h>
#endif
#include <errno.h>
#include "event2/util.h"
#include "event2/buffer.h"
#include "event2/buffer_compat.h"
#include "event2/bufferevent.h"
#include "event2/bufferevent_struct.h"
#include "event2/bufferevent_compat.h"
#include "event2/event.h"
#include "log-internal.h"
#include "mm-internal.h"
#include "bufferevent-internal.h"
#include "evbuffer-internal.h"
#include "util-internal.h"
static void bufferevent_cancel_all_(struct bufferevent *bev);
void
bufferevent_suspend_read_(struct bufferevent *bufev, bufferevent_suspend_flags what)
{
struct bufferevent_private *bufev_private =
EVUTIL_UPCAST(bufev, struct bufferevent_private, bev);
BEV_LOCK(bufev);
if (!bufev_private->read_suspended)
bufev->be_ops->disable(bufev, EV_READ);
bufev_private->read_suspended |= what;
BEV_UNLOCK(bufev);
}
void
bufferevent_unsuspend_read_(struct bufferevent *bufev, bufferevent_suspend_flags what)
{
struct bufferevent_private *bufev_private =
EVUTIL_UPCAST(bufev, struct bufferevent_private, bev);
BEV_LOCK(bufev);
bufev_private->read_suspended &= ~what;
if (!bufev_private->read_suspended && (bufev->enabled & EV_READ))
bufev->be_ops->enable(bufev, EV_READ);
BEV_UNLOCK(bufev);
}
void
bufferevent_suspend_write_(struct bufferevent *bufev, bufferevent_suspend_flags what)
{
struct bufferevent_private *bufev_private =
EVUTIL_UPCAST(bufev, struct bufferevent_private, bev);
BEV_LOCK(bufev);
if (!bufev_private->write_suspended)
bufev->be_ops->disable(bufev, EV_WRITE);
bufev_private->write_suspended |= what;
BEV_UNLOCK(bufev);
}
void
bufferevent_unsuspend_write_(struct bufferevent *bufev, bufferevent_suspend_flags what)
{
struct bufferevent_private *bufev_private =
EVUTIL_UPCAST(bufev, struct bufferevent_private, bev);
BEV_LOCK(bufev);
bufev_private->write_suspended &= ~what;
if (!bufev_private->write_suspended && (bufev->enabled & EV_WRITE))
bufev->be_ops->enable(bufev, EV_WRITE);
BEV_UNLOCK(bufev);
}
/* Callback to implement watermarks on the input buffer. Only enabled
* if the watermark is set. */
static void
bufferevent_inbuf_wm_cb(struct evbuffer *buf,
const struct evbuffer_cb_info *cbinfo,
void *arg)
{
struct bufferevent *bufev = arg;
size_t size;
size = evbuffer_get_length(buf);
if (size >= bufev->wm_read.high)
bufferevent_wm_suspend_read(bufev);
else
bufferevent_wm_unsuspend_read(bufev);
}
static void
bufferevent_run_deferred_callbacks_locked(struct deferred_cb *_, void *arg)
{
struct bufferevent_private *bufev_private = arg;
struct bufferevent *bufev = &bufev_private->bev;
BEV_LOCK(bufev);
if ((bufev_private->eventcb_pending & BEV_EVENT_CONNECTED) &&
bufev->errorcb) {
/* The "connected" happened before any reads or writes, so
send it first. */
bufev_private->eventcb_pending &= ~BEV_EVENT_CONNECTED;
bufev->errorcb(bufev, BEV_EVENT_CONNECTED, bufev->cbarg);
}
if (bufev_private->readcb_pending && bufev->readcb) {
bufev_private->readcb_pending = 0;
bufev->readcb(bufev, bufev->cbarg);
}
if (bufev_private->writecb_pending && bufev->writecb) {
bufev_private->writecb_pending = 0;
bufev->writecb(bufev, bufev->cbarg);
}
if (bufev_private->eventcb_pending && bufev->errorcb) {
short what = bufev_private->eventcb_pending;
int err = bufev_private->errno_pending;
bufev_private->eventcb_pending = 0;
bufev_private->errno_pending = 0;
EVUTIL_SET_SOCKET_ERROR(err);
bufev->errorcb(bufev, what, bufev->cbarg);
}
bufferevent_decref_and_unlock_(bufev);
}
static void
bufferevent_run_deferred_callbacks_unlocked(struct deferred_cb *_, void *arg)
{
struct bufferevent_private *bufev_private = arg;
struct bufferevent *bufev = &bufev_private->bev;
BEV_LOCK(bufev);
#define UNLOCKED(stmt) \
do { BEV_UNLOCK(bufev); stmt; BEV_LOCK(bufev); } while(0)
if ((bufev_private->eventcb_pending & BEV_EVENT_CONNECTED) &&
bufev->errorcb) {
/* The "connected" happened before any reads or writes, so
send it first. */
bufferevent_event_cb errorcb = bufev->errorcb;
void *cbarg = bufev->cbarg;
bufev_private->eventcb_pending &= ~BEV_EVENT_CONNECTED;
UNLOCKED(errorcb(bufev, BEV_EVENT_CONNECTED, cbarg));
}
if (bufev_private->readcb_pending && bufev->readcb) {
bufferevent_data_cb readcb = bufev->readcb;
void *cbarg = bufev->cbarg;
bufev_private->readcb_pending = 0;
UNLOCKED(readcb(bufev, cbarg));
}
if (bufev_private->writecb_pending && bufev->writecb) {
bufferevent_data_cb writecb = bufev->writecb;
void *cbarg = bufev->cbarg;
bufev_private->writecb_pending = 0;
UNLOCKED(writecb(bufev, cbarg));
}
if (bufev_private->eventcb_pending && bufev->errorcb) {
bufferevent_event_cb errorcb = bufev->errorcb;
void *cbarg = bufev->cbarg;
short what = bufev_private->eventcb_pending;
int err = bufev_private->errno_pending;
bufev_private->eventcb_pending = 0;
bufev_private->errno_pending = 0;
EVUTIL_SET_SOCKET_ERROR(err);
UNLOCKED(errorcb(bufev,what,cbarg));
}
bufferevent_decref_and_unlock_(bufev);
#undef UNLOCKED
}
#define SCHEDULE_DEFERRED(bevp) \
do { \
bufferevent_incref_(&(bevp)->bev); \
event_deferred_cb_schedule_( \
event_base_get_deferred_cb_queue_((bevp)->bev.ev_base), \
&(bevp)->deferred); \
} while (0)
void
bufferevent_run_readcb_(struct bufferevent *bufev)
{
/* Requires that we hold the lock and a reference */
struct bufferevent_private *p =
EVUTIL_UPCAST(bufev, struct bufferevent_private, bev);
if (bufev->readcb == NULL)
return;
if (p->options & BEV_OPT_DEFER_CALLBACKS) {
p->readcb_pending = 1;
if (!p->deferred.queued)
SCHEDULE_DEFERRED(p);
} else {
bufev->readcb(bufev, bufev->cbarg);
}
}
void
bufferevent_run_writecb_(struct bufferevent *bufev)
{
/* Requires that we hold the lock and a reference */
struct bufferevent_private *p =
EVUTIL_UPCAST(bufev, struct bufferevent_private, bev);
if (bufev->writecb == NULL)
return;
if (p->options & BEV_OPT_DEFER_CALLBACKS) {
p->writecb_pending = 1;
if (!p->deferred.queued)
SCHEDULE_DEFERRED(p);
} else {
bufev->writecb(bufev, bufev->cbarg);
}
}
void
bufferevent_run_eventcb_(struct bufferevent *bufev, short what)
{
/* Requires that we hold the lock and a reference */
struct bufferevent_private *p =
EVUTIL_UPCAST(bufev, struct bufferevent_private, bev);
if (bufev->errorcb == NULL)
return;
if (p->options & BEV_OPT_DEFER_CALLBACKS) {
p->eventcb_pending |= what;
p->errno_pending = EVUTIL_SOCKET_ERROR();
if (!p->deferred.queued)
SCHEDULE_DEFERRED(p);
} else {
bufev->errorcb(bufev, what, bufev->cbarg);
}
}
int
bufferevent_init_common_(struct bufferevent_private *bufev_private,
struct event_base *base,
const struct bufferevent_ops *ops,
enum bufferevent_options options)
{
struct bufferevent *bufev = &bufev_private->bev;
if (!bufev->input) {
if ((bufev->input = evbuffer_new()) == NULL)
return -1;
}
if (!bufev->output) {
if ((bufev->output = evbuffer_new()) == NULL) {
evbuffer_free(bufev->input);
return -1;
}
}
bufev_private->refcnt = 1;
bufev->ev_base = base;
/* Disable timeouts. */
evutil_timerclear(&bufev->timeout_read);
evutil_timerclear(&bufev->timeout_write);
bufev->be_ops = ops;
bufferevent_ratelim_init_(bufev_private);
/*
* Set to EV_WRITE so that using bufferevent_write is going to
* trigger a callback. Reading needs to be explicitly enabled
* because otherwise no data will be available.
*/
bufev->enabled = EV_WRITE;
#ifndef EVENT__DISABLE_THREAD_SUPPORT
if (options & BEV_OPT_THREADSAFE) {
if (bufferevent_enable_locking_(bufev, NULL) < 0) {
/* cleanup */
evbuffer_free(bufev->input);
evbuffer_free(bufev->output);
bufev->input = NULL;
bufev->output = NULL;
return -1;
}
}
#endif
if ((options & (BEV_OPT_DEFER_CALLBACKS|BEV_OPT_UNLOCK_CALLBACKS))
== BEV_OPT_UNLOCK_CALLBACKS) {
event_warnx("UNLOCK_CALLBACKS requires DEFER_CALLBACKS");
return -1;
}
if (options & BEV_OPT_DEFER_CALLBACKS) {
if (options & BEV_OPT_UNLOCK_CALLBACKS)
event_deferred_cb_init_(&bufev_private->deferred,
bufferevent_run_deferred_callbacks_unlocked,
bufev_private);
else
event_deferred_cb_init_(&bufev_private->deferred,
bufferevent_run_deferred_callbacks_locked,
bufev_private);
}
bufev_private->options = options;
evbuffer_set_parent_(bufev->input, bufev);
evbuffer_set_parent_(bufev->output, bufev);
return 0;
}
void
bufferevent_setcb(struct bufferevent *bufev,
bufferevent_data_cb readcb, bufferevent_data_cb writecb,
bufferevent_event_cb eventcb, void *cbarg)
{
BEV_LOCK(bufev);
bufev->readcb = readcb;
bufev->writecb = writecb;
bufev->errorcb = eventcb;
bufev->cbarg = cbarg;
BEV_UNLOCK(bufev);
}
void
bufferevent_getcb(struct bufferevent *bufev,
bufferevent_data_cb *readcb_ptr,
bufferevent_data_cb *writecb_ptr,
bufferevent_event_cb *eventcb_ptr,
void **cbarg_ptr)
{
BEV_LOCK(bufev);
if (readcb_ptr)
*readcb_ptr = bufev->readcb;
if (writecb_ptr)
*writecb_ptr = bufev->writecb;
if (eventcb_ptr)
*eventcb_ptr = bufev->errorcb;
if (cbarg_ptr)
*cbarg_ptr = bufev->cbarg;
BEV_UNLOCK(bufev);
}
struct evbuffer *
bufferevent_get_input(struct bufferevent *bufev)
{
return bufev->input;
}
struct evbuffer *
bufferevent_get_output(struct bufferevent *bufev)
{
return bufev->output;
}
struct event_base *
bufferevent_get_base(struct bufferevent *bufev)
{
return bufev->ev_base;
}
int
bufferevent_write(struct bufferevent *bufev, const void *data, size_t size)
{
if (evbuffer_add(bufev->output, data, size) == -1)
return (-1);
return 0;
}
int
bufferevent_write_buffer(struct bufferevent *bufev, struct evbuffer *buf)
{
if (evbuffer_add_buffer(bufev->output, buf) == -1)
return (-1);
return 0;
}
size_t
bufferevent_read(struct bufferevent *bufev, void *data, size_t size)
{
return (evbuffer_remove(bufev->input, data, size));
}
int
bufferevent_read_buffer(struct bufferevent *bufev, struct evbuffer *buf)
{
return (evbuffer_add_buffer(buf, bufev->input));
}
int
bufferevent_enable(struct bufferevent *bufev, short event)
{
struct bufferevent_private *bufev_private =
EVUTIL_UPCAST(bufev, struct bufferevent_private, bev);
short impl_events = event;
int r = 0;
bufferevent_incref_and_lock_(bufev);
if (bufev_private->read_suspended)
impl_events &= ~EV_READ;
if (bufev_private->write_suspended)
impl_events &= ~EV_WRITE;
bufev->enabled |= event;
if (impl_events && bufev->be_ops->enable(bufev, impl_events) < 0)
r = -1;
bufferevent_decref_and_unlock_(bufev);
return r;
}
int
bufferevent_set_timeouts(struct bufferevent *bufev,
const struct timeval *tv_read,
const struct timeval *tv_write)
{
int r = 0;
BEV_LOCK(bufev);
if (tv_read) {
bufev->timeout_read = *tv_read;
} else {
evutil_timerclear(&bufev->timeout_read);
}
if (tv_write) {
bufev->timeout_write = *tv_write;
} else {
evutil_timerclear(&bufev->timeout_write);
}
if (bufev->be_ops->adj_timeouts)
r = bufev->be_ops->adj_timeouts(bufev);
BEV_UNLOCK(bufev);
return r;
}
/* Obsolete; use bufferevent_set_timeouts */
void
bufferevent_settimeout(struct bufferevent *bufev,
int timeout_read, int timeout_write)
{
struct timeval tv_read, tv_write;
struct timeval *ptv_read = NULL, *ptv_write = NULL;
memset(&tv_read, 0, sizeof(tv_read));
memset(&tv_write, 0, sizeof(tv_write));
if (timeout_read) {
tv_read.tv_sec = timeout_read;
ptv_read = &tv_read;
}
if (timeout_write) {
tv_write.tv_sec = timeout_write;
ptv_write = &tv_write;
}
bufferevent_set_timeouts(bufev, ptv_read, ptv_write);
}
int
bufferevent_disable_hard_(struct bufferevent *bufev, short event)
{
int r = 0;
struct bufferevent_private *bufev_private =
EVUTIL_UPCAST(bufev, struct bufferevent_private, bev);
BEV_LOCK(bufev);
bufev->enabled &= ~event;
bufev_private->connecting = 0;
if (bufev->be_ops->disable(bufev, event) < 0)
r = -1;
BEV_UNLOCK(bufev);
return r;
}
int
bufferevent_disable(struct bufferevent *bufev, short event)
{
int r = 0;
BEV_LOCK(bufev);
bufev->enabled &= ~event;
if (bufev->be_ops->disable(bufev, event) < 0)
r = -1;
BEV_UNLOCK(bufev);
return r;
}
/*
* Sets the water marks
*/
void
bufferevent_setwatermark(struct bufferevent *bufev, short events,
size_t lowmark, size_t highmark)
{
struct bufferevent_private *bufev_private =
EVUTIL_UPCAST(bufev, struct bufferevent_private, bev);
BEV_LOCK(bufev);
if (events & EV_WRITE) {
bufev->wm_write.low = lowmark;
bufev->wm_write.high = highmark;
}
if (events & EV_READ) {
bufev->wm_read.low = lowmark;
bufev->wm_read.high = highmark;
if (highmark) {
/* There is now a new high-water mark for read.
enable the callback if needed, and see if we should
suspend/bufferevent_wm_unsuspend. */
if (bufev_private->read_watermarks_cb == NULL) {
bufev_private->read_watermarks_cb =
evbuffer_add_cb(bufev->input,
bufferevent_inbuf_wm_cb,
bufev);
}
evbuffer_cb_set_flags(bufev->input,
bufev_private->read_watermarks_cb,
EVBUFFER_CB_ENABLED|EVBUFFER_CB_NODEFER);
if (evbuffer_get_length(bufev->input) > highmark)
bufferevent_wm_suspend_read(bufev);
else if (evbuffer_get_length(bufev->input) < highmark)
bufferevent_wm_unsuspend_read(bufev);
} else {
/* There is now no high-water mark for read. */
if (bufev_private->read_watermarks_cb)
evbuffer_cb_clear_flags(bufev->input,
bufev_private->read_watermarks_cb,
EVBUFFER_CB_ENABLED);
bufferevent_wm_unsuspend_read(bufev);
}
}
BEV_UNLOCK(bufev);
}
int
bufferevent_flush(struct bufferevent *bufev,
short iotype,
enum bufferevent_flush_mode mode)
{
int r = -1;
BEV_LOCK(bufev);
if (bufev->be_ops->flush)
r = bufev->be_ops->flush(bufev, iotype, mode);
BEV_UNLOCK(bufev);
return r;
}
void
bufferevent_incref_and_lock_(struct bufferevent *bufev)
{
struct bufferevent_private *bufev_private =
BEV_UPCAST(bufev);
BEV_LOCK(bufev);
++bufev_private->refcnt;
}
#if 0
static void
_bufferevent_transfer_lock_ownership(struct bufferevent *donor,
struct bufferevent *recipient)
{
struct bufferevent_private *d = BEV_UPCAST(donor);
struct bufferevent_private *r = BEV_UPCAST(recipient);
if (d->lock != r->lock)
return;
if (r->own_lock)
return;
if (d->own_lock) {
d->own_lock = 0;
r->own_lock = 1;
}
}
#endif
int
bufferevent_decref_and_unlock_(struct bufferevent *bufev)
{
struct bufferevent_private *bufev_private =
EVUTIL_UPCAST(bufev, struct bufferevent_private, bev);
struct bufferevent *underlying;
EVUTIL_ASSERT(bufev_private->refcnt > 0);
if (--bufev_private->refcnt) {
BEV_UNLOCK(bufev);
return 0;
}
underlying = bufferevent_get_underlying(bufev);
/* Clean up the shared info */
if (bufev->be_ops->destruct)
bufev->be_ops->destruct(bufev);
/* XXX what happens if refcnt for these buffers is > 1?
* The buffers can share a lock with this bufferevent object,
* but the lock might be destroyed below. */
/* evbuffer will free the callbacks */
evbuffer_free(bufev->input);
evbuffer_free(bufev->output);
if (bufev_private->rate_limiting) {
if (bufev_private->rate_limiting->group)
bufferevent_remove_from_rate_limit_group_internal_(bufev,0);
if (event_initialized(&bufev_private->rate_limiting->refill_bucket_event))
event_del(&bufev_private->rate_limiting->refill_bucket_event);
event_debug_unassign(&bufev_private->rate_limiting->refill_bucket_event);
mm_free(bufev_private->rate_limiting);
bufev_private->rate_limiting = NULL;
}
event_debug_unassign(&bufev->ev_read);
event_debug_unassign(&bufev->ev_write);
BEV_UNLOCK(bufev);
if (bufev_private->own_lock)
EVTHREAD_FREE_LOCK(bufev_private->lock,
EVTHREAD_LOCKTYPE_RECURSIVE);
/* Free the actual allocated memory. */
mm_free(((char*)bufev) - bufev->be_ops->mem_offset);
/* Release the reference to underlying now that we no longer need the
* reference to it. We wait this long mainly in case our lock is
* shared with underlying.
*
* The 'destruct' function will also drop a reference to underlying
* if BEV_OPT_CLOSE_ON_FREE is set.
*
* XXX Should we/can we just refcount evbuffer/bufferevent locks?
* It would probably save us some headaches.
*/
if (underlying)
bufferevent_decref_(underlying);
return 1;
}
int
bufferevent_decref_(struct bufferevent *bufev)
{
BEV_LOCK(bufev);
return bufferevent_decref_and_unlock_(bufev);
}
void
bufferevent_free(struct bufferevent *bufev)
{
BEV_LOCK(bufev);
bufferevent_setcb(bufev, NULL, NULL, NULL, NULL);
bufferevent_cancel_all_(bufev);
bufferevent_decref_and_unlock_(bufev);
}
void
bufferevent_incref_(struct bufferevent *bufev)
{
struct bufferevent_private *bufev_private =
EVUTIL_UPCAST(bufev, struct bufferevent_private, bev);
BEV_LOCK(bufev);
++bufev_private->refcnt;
BEV_UNLOCK(bufev);
}
int
bufferevent_enable_locking_(struct bufferevent *bufev, void *lock)
{
#ifdef EVENT__DISABLE_THREAD_SUPPORT
return -1;
#else
struct bufferevent *underlying;
if (BEV_UPCAST(bufev)->lock)
return -1;
underlying = bufferevent_get_underlying(bufev);
if (!lock && underlying && BEV_UPCAST(underlying)->lock) {
lock = BEV_UPCAST(underlying)->lock;
BEV_UPCAST(bufev)->lock = lock;
BEV_UPCAST(bufev)->own_lock = 0;
} else if (!lock) {
EVTHREAD_ALLOC_LOCK(lock, EVTHREAD_LOCKTYPE_RECURSIVE);
if (!lock)
return -1;
BEV_UPCAST(bufev)->lock = lock;
BEV_UPCAST(bufev)->own_lock = 1;
} else {
BEV_UPCAST(bufev)->lock = lock;
BEV_UPCAST(bufev)->own_lock = 0;
}
evbuffer_enable_locking(bufev->input, lock);
evbuffer_enable_locking(bufev->output, lock);
if (underlying && !BEV_UPCAST(underlying)->lock)
bufferevent_enable_locking_(underlying, lock);
return 0;
#endif
}
int
bufferevent_setfd(struct bufferevent *bev, evutil_socket_t fd)
{
union bufferevent_ctrl_data d;
int res = -1;
d.fd = fd;
BEV_LOCK(bev);
if (bev->be_ops->ctrl)
res = bev->be_ops->ctrl(bev, BEV_CTRL_SET_FD, &d);
BEV_UNLOCK(bev);
return res;
}
evutil_socket_t
bufferevent_getfd(struct bufferevent *bev)
{
union bufferevent_ctrl_data d;
int res = -1;
d.fd = -1;
BEV_LOCK(bev);
if (bev->be_ops->ctrl)
res = bev->be_ops->ctrl(bev, BEV_CTRL_GET_FD, &d);
BEV_UNLOCK(bev);
return (res<0) ? -1 : d.fd;
}
static void
bufferevent_cancel_all_(struct bufferevent *bev)
{
union bufferevent_ctrl_data d;
memset(&d, 0, sizeof(d));
BEV_LOCK(bev);
if (bev->be_ops->ctrl)
bev->be_ops->ctrl(bev, BEV_CTRL_CANCEL_ALL, &d);
BEV_UNLOCK(bev);
}
short
bufferevent_get_enabled(struct bufferevent *bufev)
{
short r;
BEV_LOCK(bufev);
r = bufev->enabled;
BEV_UNLOCK(bufev);
return r;
}
struct bufferevent *
bufferevent_get_underlying(struct bufferevent *bev)
{
union bufferevent_ctrl_data d;
int res = -1;
d.ptr = NULL;
BEV_LOCK(bev);
if (bev->be_ops->ctrl)
res = bev->be_ops->ctrl(bev, BEV_CTRL_GET_UNDERLYING, &d);
BEV_UNLOCK(bev);
return (res<0) ? NULL : d.ptr;
}
static void
bufferevent_generic_read_timeout_cb(evutil_socket_t fd, short event, void *ctx)
{
struct bufferevent *bev = ctx;
bufferevent_incref_and_lock_(bev);
bufferevent_disable(bev, EV_READ);
bufferevent_run_eventcb_(bev, BEV_EVENT_TIMEOUT|BEV_EVENT_READING);
bufferevent_decref_and_unlock_(bev);
}
static void
bufferevent_generic_write_timeout_cb(evutil_socket_t fd, short event, void *ctx)
{
struct bufferevent *bev = ctx;
bufferevent_incref_and_lock_(bev);
bufferevent_disable(bev, EV_WRITE);
bufferevent_run_eventcb_(bev, BEV_EVENT_TIMEOUT|BEV_EVENT_WRITING);
bufferevent_decref_and_unlock_(bev);
}
void
bufferevent_init_generic_timeout_cbs_(struct bufferevent *bev)
{
evtimer_assign(&bev->ev_read, bev->ev_base,
bufferevent_generic_read_timeout_cb, bev);
evtimer_assign(&bev->ev_write, bev->ev_base,
bufferevent_generic_write_timeout_cb, bev);
}
int
bufferevent_del_generic_timeout_cbs_(struct bufferevent *bev)
{
int r1,r2;
r1 = event_del(&bev->ev_read);
r2 = event_del(&bev->ev_write);
if (r1<0 || r2<0)
return -1;
return 0;
}
int
bufferevent_generic_adj_timeouts_(struct bufferevent *bev)
{
const short enabled = bev->enabled;
struct bufferevent_private *bev_p =
EVUTIL_UPCAST(bev, struct bufferevent_private, bev);
int r1=0, r2=0;
if ((enabled & EV_READ) && !bev_p->read_suspended &&
evutil_timerisset(&bev->timeout_read))
r1 = event_add(&bev->ev_read, &bev->timeout_read);
else
r1 = event_del(&bev->ev_read);
if ((enabled & EV_WRITE) && !bev_p->write_suspended &&
evutil_timerisset(&bev->timeout_write) &&
evbuffer_get_length(bev->output))
r2 = event_add(&bev->ev_write, &bev->timeout_write);
else
r2 = event_del(&bev->ev_write);
if (r1 < 0 || r2 < 0)
return -1;
return 0;
}
int
bufferevent_add_event_(struct event *ev, const struct timeval *tv)
{
if (tv->tv_sec == 0 && tv->tv_usec == 0)
return event_add(ev, NULL);
else
return event_add(ev, tv);
}
/* For use by user programs only; internally, we should be calling
either bufferevent_incref_and_lock_(), or BEV_LOCK. */
void
bufferevent_lock(struct bufferevent *bev)
{
bufferevent_incref_and_lock_(bev);
}
void
bufferevent_unlock(struct bufferevent *bev)
{
bufferevent_decref_and_unlock_(bev);
}