From 60192b462563034cfa179c98f50a1bf51e0f3859 Mon Sep 17 00:00:00 2001 From: Niels Provos Date: Sun, 26 Feb 2006 20:18:35 +0000 Subject: [PATCH] improved/well-completely rewritten rtsig support by Mathew Mills; fix some cases where regress would not pass on Linux svn:r204 --- rtsig.c | 1160 +++++++++++++++++++++++++++++++++++------------- test/regress.c | 13 +- 2 files changed, 864 insertions(+), 309 deletions(-) diff --git a/rtsig.c b/rtsig.c index 32a782cd..29aade69 100644 --- a/rtsig.c +++ b/rtsig.c @@ -1,3 +1,152 @@ +/* + * Copyright (c) 2006 Mathew Mills + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions + * are met: + * 1. Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * 3. The name of the author may not be used to endorse or promote products + * derived from this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR + * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES + * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. + * IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, + * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT + * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF + * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ +/* + * Meta-level comments: You know that a kernel interface is wrong if + * supporting it requires three times more code than any of the other + * kernel interfaces supported in libevent. Niels - 2006-02-22 + */ +/** + + "RTSIG" is a shorthand for using O_ASYNC to make descriptors send + signals when readable/writable and to use POSIX real-time signals + witch are queued unlike normal signals. At first blush this may + seem like a alternative to epoll, but a number of problems arise + when attempting to build an eventloop entirely out of rtsig. + Still, we can use rtsig in combination with poll() to + provide an eventloop that allows for many thousands of sockets + without huge overheads implicit with using select() or poll() + alone. epoll and kqueue are far superior to rtsig and should be + used where available, but rtsig has been in standard Linux kernels + for a long time and have a huge installation base. epoll requires + special patches for 2.4 kernels and 2.6 kernels are not yet nearly + so ubiquitous. + + rtsig problems: + - O_ASYNC mechanisms work only on sockets - not pipes or tty's + + - O_ASYNC signals are edge-triggered, POLLIN on packet arriving + or socket close; POLLOUT when a socket transitions from + non-writable to writable. Being edge-triggered means the + event-handler callbacks must transition the level ( reading + completely the socket buffer contents ) or it will be unable to + reliably receive notification again. + + - rtsig implementations must be intimately involved in how a + process dispatches signals. + + - delivering signals per-event can be expensive, CPU-wise, but + sigtimedwait() blocks on signals only and means non-sockets + cannot be serviced. + + Theory of operation: + This libevent module uses rtsig to allow us to manage a set of + poll-event descriptors. We can drop uninteresting fd's from the + pollset if the fd will send a signal when it becomes interesting + again. + + poll() offers us level-triggering and, when we have verified the + level of a socket, we can trust the edge-trigger nature of the + ASYNC signal. + + As an eventloop we must poll for external events but leverage + kernel functionality to sleep between events ( until the loop's + next scheduled timed event ). + + If we are polling on any non-sockets then we simply have no choice + about blocking on the poll() call. If we blocked on the + sigtimedwait() call as rtsig papers recommend we will not wake on + non-socket state transitions. As part of libevent, this module + must support non-socket polling. + + Many applications, however, do not need to poll on non-sockets and + so this module should be able to optimize this case by using + sigtimedwait(). For this reason this module can actually trigger + events in each of three different ways: + - poll() returning ready events from descriptors in the pollset + + - real-time signals dequeued via sigtimedwait() + + - real-time signals that call an installed signal handler which in + turn writes the contents of siginfo to one end of a socketpair + DGRAM socket. The other end of the socket is always in the + pollset so poll will be guaranteed to return even if the signal is + received before entering poll(). + + non-socket descriptors force us to block on the poll() for the + duration of a dispatch. In this case we unblock (w/ sigprocmask) + the managed signals just before polling. Each managed signal is + handled by signal_handler() which send()'s the contents of siginfo + over the socketpair. Otherwise, we call poll() with a timeout of + 0ms so it checks the levels of the fd's in the pollset and returns + immediately. Any fd that is a socket and has no active state is + removed from the pollset for the next pass -- we will rely on + getting a signal for events on these fd's. + + The receiving end of the siginfo socketpair is in the pollset + (permanently) so if we are polling on non-sockets, the delivery of + signals immediately following sigprocmask( SIG_UNBLOCK...) will + result in a readable op->signal_recv_fd which ensures the poll() + will return immediately. If the poll() call is blocking and a + signal arrives ( possibly a real-time signal from a socket not in + the pollset ) its handler will write the data to the socketpair + and interrupt the poll(). + + After every poll call we attempt a non-blocking recv from the + signal_recv_fd and continue to recv and dispatch the events until + recv indicates the socket buffer is empty. + + One might raise concerns about receiving event activations from + both poll() and from the rtsig data in the signal_recv_fd. + Fortunately, libevent is already structured for event coalescing, + so this issue is mitigated ( though we do some work twice for the + same event making us less efficient ). I suspect that the cost of + turning off the O_ASYNC flag on fd's in the pollset is more + expensive than handling some events twice. Looking at the + kernel's source code for setting O_ASYNC, it looks like it takes a + global kernel lock... + + After a poll and recv-loop for the signal_recv_fd, we finally do a + sigtimedwait(). sigtimedwait will only block if we haven't + blocked in poll() and we have not enqueued events from either the + poll or the recv-loop. Because sigtimedwait blocks all signals + that are not in the set of signals to be dequeued, we need to + dequeue almost all signals and make sure we dispatch them + correctly. We dequeue any signal that is not blocked as well as + all libevent-managed signals. If we get a signal that is not + managed by libevent we lookup the sigaction for the specific + signal and call that function ourselves. + + Finally, I should mention that getting a SIGIO signal indicates + that the rtsig buffer has overflowed and we have lost events. + This forces us to add _every_ descriptor to the pollset to recover. + +*/ + + #ifdef HAVE_CONFIG_H #include "config.h" #endif @@ -20,104 +169,59 @@ #include #include #include -#ifndef HAVE_WORKING_RTSIG -#include -#endif +#include #include - -#define EVLIST_X_NORT 0x1000 /* Skip RT signals (internal) */ +#include #include "event.h" +#include "event-internal.h" #include "log.h" extern struct event_list signalqueue; -struct rtsigop { - sigset_t sigs; - struct pollfd *poll; - struct event **toev; - int cur, max, total; -#ifndef HAVE_WORKING_RTSIG - int pollmode; +#include +#ifndef __NR_gettid +#define gettid() getpid() +#else + +#if ((__GLIBC__ > 2) || ((__GLIBC__ == 2) && (__GLIBC_MINOR__ >= 3))) +_syscall0(pid_t,gettid) #endif + +#endif + +#define EVLIST_NONSOCK 0x1000 /* event is for a non-socket file-descriptor */ +#define EVLIST_DONTDEL 0x2000 /* event should always be in the pollset */ +#define MAXBUFFERSIZE (1024 * 1024 * 2) /* max socketbuffer for signal-spair */ +#define INIT_MAX 16 /* init/min # of fd positions in our pollset */ + +static int signal_send_fd[_NSIG]; /* the globalend of the signal socketpair */ +static int trouble[_NSIG]; /* 1 when signal-handler cant send to signal_send_fd */ + +struct rtdata; +TAILQ_HEAD(rtdata_list, rtdata); + +struct rtsigop { + sigset_t sigs; /* signal mask for all _managed_ signals */ + struct pollfd *poll; /* poll structures */ + struct rtdata **ptodat; /* map poll_position to rtdata */ + int cur; /* cur # fd's in a poll set */ + int max; /* max # fd's in a poll set, start at 16 and grow as needed */ + int total; /* count of fd's we are watching now */ + int signo; /* the signo we use for ASYNC fd notifications */ + int nonsock; /* number of non-socket fd's we are watching */ + int highestfd; /* highest fd accomodated by fdtodat */ + struct rtdata_list **fdtodat; /* map fd to rtdata ( and thus to event ) */ + int signal_recv_fd; /* recv side of the signal_send_fd */ + int signal_send_fd; /* recv side of the signal_send_fd */ + struct event sigfdev; /* our own event structure for the signal fd */ }; -#define INIT_MAX 16 - -static int -poll_add(struct rtsigop *op, struct event *ev) -{ - struct pollfd *pfd; - - if (op->poll == NULL) return 0; - - if (op->cur == op->max) { - void *p; - - p = realloc(op->poll, sizeof(*op->poll) * (op->max << 1)); - if (!p) { - errno = ENOMEM; - return -1; - } - op->poll = p; - p = realloc(op->toev, sizeof(*op->toev) * (op->max << 1)); - if (!p) { - op->poll = realloc(op->poll, sizeof(*op->poll) * op->max); - errno = ENOMEM; - return -1; - } - op->toev = p; - op->max <<= 1; - } - - pfd = &op->poll[op->cur]; - pfd->fd = ev->ev_fd; - pfd->events = 0; - if (ev->ev_events & EV_READ) pfd->events |= POLLIN; - if (ev->ev_events & EV_WRITE) pfd->events |= POLLOUT; - pfd->revents = 0; - - op->toev[op->cur] = ev; - op->cur++; - - return 0; -} - -static void -poll_free(struct rtsigop *op, int n) -{ - if (op->poll == NULL) return; - - op->cur--; - if (n < op->cur) { - memcpy(&op->poll[n], &op->poll[op->cur], sizeof(*op->poll)); - op->toev[n] = op->toev[op->cur]; - } - if (op->max > INIT_MAX && op->cur < op->max >> 1) { - op->max >>= 1; - op->poll = realloc(op->poll, sizeof(*op->poll) * op->max); - op->toev = realloc(op->toev, sizeof(*op->toev) * op->max); - } -} - -static void -poll_remove(struct rtsigop *op, struct event *ev) -{ - int i; - - for (i = 0; i < op->cur; i++) { - if (op->toev[i] == ev) { - poll_free(op, i); - break; - } - } -} - -static void -activate(struct event *ev, int flags) -{ - if (!(ev->ev_events & EV_PERSIST)) event_del(ev); - event_active(ev, flags, 1); -} +struct rtdata { + /* rtdata holds rtsig-private state on each event */ + TAILQ_ENTRY (rtdata) next; + struct event *ev; + int poll_position; +}; void *rtsig_init(void); int rtsig_add(void *, struct event *); @@ -126,46 +230,404 @@ int rtsig_recalc(struct event_base *, void *, int); int rtsig_dispatch(struct event_base *, void *, struct timeval *); struct eventop rtsigops = { - "rtsig", - rtsig_init, - rtsig_add, - rtsig_del, - rtsig_recalc, - rtsig_dispatch + "rtsig", + rtsig_init, + rtsig_add, + rtsig_del, + rtsig_recalc, + rtsig_dispatch }; +static void +signal_handler(int sig, siginfo_t *info, void *ctx) +{ + /* + * the signal handler for all libevent-managed signals only + * used if we need to do a blocking poll() call due to + * non-socket fd's in the pollset. + */ + + siginfo_t *i = info; + siginfo_t i_local; + + if (trouble[sig - 1]) { + i_local.si_signo = SIGIO; + i_local.si_errno = 0; + i_local.si_code = 0; + i = &i_local; + trouble[sig - 1] = 0; + } + + if (send(signal_send_fd[sig - 1], i, sizeof(*i), + MSG_DONTWAIT|MSG_NOSIGNAL) == -1) + trouble[sig - 1] = 1; +} + +static void +donothing(int fd, short event, void *arg) +{ + /* + * callback for our signal_recv_fd event structure + * we don't want to act on these events, we just want to wake the poll() + */ +}; + +static void +signotset(sigset_t *set) +{ + int i, l; + l = sizeof(*set) / 4; + for (i = 0; i < l; i++) { + ((unsigned *)set)[i] = ~((unsigned *)set)[i]; + } +} + +/* The next three functions manage our private data about each event struct */ + +static int +grow_fdset(struct rtsigop *op, int newhigh) +{ + /* + * grow the fd -> rtdata array because we have encountered a + * new fd too high to fit in the existing array + */ + + struct rtdata_list **p; + struct rtdata_list *datset; + int i,x; + int newcnt = (newhigh + 1) << 1; + + if (newhigh <= op->highestfd) + return (0); + + p = op->fdtodat; + p = realloc(op->fdtodat, sizeof(struct rtdata_list *) * newcnt); + if (p == NULL) + return (-1); + op->fdtodat = p; + + datset = calloc(newcnt - (op->highestfd + 1), + sizeof(struct rtdata_list)); + if (datset == NULL) + return (-1); + + for (i = op->highestfd + 1, x = 0; i < newcnt; i++, x++) { + op->fdtodat[i] = &(datset[x]); + TAILQ_INIT(op->fdtodat[i]); + } + + op->highestfd = newcnt - 1; + return (0); +} + +static struct rtdata * +ev2dat(struct rtsigop *op, struct event *ev, int create) +{ + /* + * given an event struct, find the dat structure that + * corresponds to it if create is non-zero and the rtdata + * structure does not exist, create it return NULL if not + * found + */ + + int found = 0; + int fd = ev->ev_fd; + struct rtdata *ret = NULL; + + if (op->highestfd < fd && create) + if (grow_fdset(op, fd) == -1) + return (NULL); + + TAILQ_FOREACH(ret, op->fdtodat[fd], next) { + if (ret->ev == ev) { + found = 1; + break; + } + } + + if (!found) { + if (!create) + return (NULL); + + ret = calloc(1, sizeof(struct rtdata)); + if (ret == NULL) + return (NULL); + ret->ev = ev; + ret->poll_position = -1; + TAILQ_INSERT_TAIL(op->fdtodat[fd], ret, next); + } + + return (ret); +} + +static void +dat_del(struct rtsigop *op, struct rtdata *dat) +{ + /* + * delete our private notes about a given event struct + * called from rtsig_del() only + */ + int fd; + if (dat == NULL) + return; + fd = dat->ev->ev_fd; + + TAILQ_REMOVE(op->fdtodat[fd], dat, next); + memset(dat, 0, sizeof(*dat)); + free(dat); +} + + +static void +set_sigaction(int sig) +{ + /* + * set the standard handler for any libevent-managed signal, + * including the rtsig used for O_ASYNC notifications + */ + struct sigaction act; + + act.sa_flags = SA_RESTART | SA_SIGINFO; + sigfillset(&(act.sa_mask)); + act.sa_sigaction = &signal_handler; + sigaction(sig, &act, NULL); +} + +static int +find_rt_signal() +{ + /* find an unused rtsignal */ + struct sigaction act; + int sig = SIGRTMIN; + + while (sig <= SIGRTMAX) { + if (sigaction(sig, NULL, &act) != 0) { + if (errno == EINTR) + continue; + } else { + if (act.sa_flags & SA_SIGINFO) { + if (act.sa_sigaction == NULL) + return (sig); + } else { + if (act.sa_handler == SIG_DFL) + return (sig); + } + } + sig++; + } + return (0); +} + +/* + * the next three functions manage our pollset and the memory management for + * fd -> rtdata -> event -> poll_position maps + */ + +static int +poll_add(struct rtsigop *op, struct event *ev, struct rtdata *dat) +{ + struct pollfd *pfd; + int newmax = op->max << 1; + int pp; + + if (op->poll == NULL) + return (0); + + if (dat == NULL) + dat = ev2dat(op, ev, 0); + + if (dat == NULL) + return (0); + + pp = dat->poll_position; + + if (pp != -1) { + pfd = &op->poll[pp]; + if (ev->ev_events & EV_READ) + pfd->events |= POLLIN; + + if (ev->ev_events & EV_WRITE) + pfd->events |= POLLOUT; + + return (0); + } + + if (op->cur == op->max) { + void *p = realloc(op->poll, sizeof(*op->poll) * newmax); + if (p == NULL) { + errno = ENOMEM; + return (-1); + } + op->poll = p; + + p = realloc(op->ptodat, sizeof(*op->ptodat) * newmax); + if (p == NULL) { + /* shrink the pollset back down */ + op->poll = realloc(op->poll, + sizeof(*op->poll) * op->max); + errno = ENOMEM; + return (-1); + } + op->ptodat = p; + op->max = newmax; + } + + pfd = &op->poll[op->cur]; + pfd->fd = ev->ev_fd; + pfd->revents = 0; + pfd->events = 0; + + if (ev->ev_events & EV_READ) + pfd->events |= POLLIN; + + if (ev->ev_events & EV_WRITE) + pfd->events |= POLLOUT; + + op->ptodat[op->cur] = dat; + dat->poll_position = op->cur; + op->cur++; + + return (0); +} + +static void +poll_free(struct rtsigop *op, int n) +{ + if (op->poll == NULL) + return; + + op->cur--; + + if (n < op->cur) { + memcpy(&op->poll[n], &op->poll[op->cur], sizeof(*op->poll)); + op->ptodat[n] = op->ptodat[op->cur]; + op->ptodat[n]->poll_position = n; + } + + + /* less then half the max in use causes us to shrink */ + if (op->max > INIT_MAX && op->cur < op->max >> 1) { + op->max >>= 1; + op->poll = realloc(op->poll, sizeof(*op->poll) * op->max); + op->ptodat = realloc(op->ptodat, sizeof(*op->ptodat) * op->max); + } +} + +static void +poll_remove(struct rtsigop *op, struct event *ev, struct rtdata *dat) +{ + int pp; + if (dat == NULL) + dat = ev2dat(op, ev, 0); + + if (dat == NULL) return; + + pp = dat->poll_position; + if (pp != -1) { + poll_free(op, pp); + dat->poll_position = -1; + } +} + +static void +activate(struct event *ev, int flags) +{ + /* activate an event, possibly removing one-shot events */ + if (!(ev->ev_events & EV_PERSIST)) + event_del(ev); + event_active(ev, flags, 1); +} + +#define FD_CLOSEONEXEC(x) do { \ + if (fcntl(x, F_SETFD, 1) == -1) \ + event_warn("fcntl(%d, F_SETFD)", x); \ +} while (0) + void * rtsig_init(void) { struct rtsigop *op; + int sockets[2]; + int optarg; + struct rtdata *dat; + int flags; if (getenv("EVENT_NORTSIG")) - return (NULL); + goto err; - op = malloc(sizeof(*op)); - if (op == NULL) return (NULL); - - memset(op, 0, sizeof(*op)); + op = calloc(1, sizeof(*op)); + if (op == NULL) + goto err; op->max = INIT_MAX; op->poll = malloc(sizeof(*op->poll) * op->max); - if (op->poll == NULL) { - free(op); - return (NULL); - } - op->toev = malloc(sizeof(*op->toev) * op->max); - if (op->toev == NULL) { - free(op->poll); - free(op); - return (NULL); - } + if (op->poll == NULL) + goto err_free_op; + + op->signo = find_rt_signal(); + if (op->signo == 0) + goto err_free_poll; + + op->nonsock = 0; + + if (socketpair(PF_UNIX, SOCK_DGRAM, 0, sockets) != 0) + goto err_free_poll; + + FD_CLOSEONEXEC(sockets[0]); + FD_CLOSEONEXEC(sockets[1]); + + signal_send_fd[op->signo - 1] = sockets[0]; + trouble[op->signo - 1] = 0; + op->signal_send_fd = sockets[0]; + op->signal_recv_fd = sockets[1]; + flags = fcntl(op->signal_recv_fd, F_GETFL); + fcntl(op->signal_recv_fd, F_SETFL, flags | O_NONBLOCK); + + optarg = MAXBUFFERSIZE; + setsockopt(signal_send_fd[op->signo - 1], + SOL_SOCKET, SO_SNDBUF, + &optarg, sizeof(optarg)); + + optarg = MAXBUFFERSIZE; + setsockopt(op->signal_recv_fd, + SOL_SOCKET, SO_RCVBUF, + &optarg, sizeof(optarg)); + + op->highestfd = -1; + op->fdtodat = NULL; + if (grow_fdset(op, 1) == -1) + goto err_close_pair; + + op->ptodat = malloc(sizeof(*op->ptodat) * op->max); + if (op->ptodat == NULL) + goto err_close_pair; sigemptyset(&op->sigs); sigaddset(&op->sigs, SIGIO); - sigaddset(&op->sigs, SIGRTMIN); + sigaddset(&op->sigs, op->signo); sigprocmask(SIG_BLOCK, &op->sigs, NULL); + set_sigaction(SIGIO); + set_sigaction(op->signo); + + event_set(&(op->sigfdev), op->signal_recv_fd, EV_READ|EV_PERSIST, + donothing, NULL); + op->sigfdev.ev_flags |= EVLIST_DONTDEL; + dat = ev2dat(op, &(op->sigfdev), 1); + poll_add(op, &(op->sigfdev), dat); return (op); + + err_close_pair: + close(op->signal_recv_fd); + close(signal_send_fd[op->signo - 1]); + + err_free_poll: + free(op->poll); + + err_free_op: + free(op); + err: + return (NULL); } int @@ -173,79 +635,102 @@ rtsig_add(void *arg, struct event *ev) { struct rtsigop *op = (struct rtsigop *) arg; int flags, i; -#ifndef HAVE_WORKING_RTSIG - struct stat st; -#endif + struct stat statbuf; + struct rtdata *dat; if (ev->ev_events & EV_SIGNAL) { + int signo = EVENT_SIGNAL(ev); + sigaddset(&op->sigs, EVENT_SIGNAL(ev)); - return sigprocmask(SIG_BLOCK, &op->sigs, NULL); + if (sigprocmask(SIG_BLOCK, &op->sigs, NULL) == -1) + return (-1); + + set_sigaction(signo); + + signal_send_fd[signo - 1] = op->signal_send_fd; + trouble[signo - 1] = 0; + + return (0); } - if (!(ev->ev_events & (EV_READ | EV_WRITE))) return 0; + if (!(ev->ev_events & (EV_READ|EV_WRITE))) + return (0); -#ifndef HAVE_WORKING_RTSIG - if (fstat(ev->ev_fd, &st) == -1) return -1; - if (S_ISFIFO(st.st_mode)) { - ev->ev_flags |= EVLIST_X_NORT; - op->pollmode++; - } -#endif + if (-1 == fstat(ev->ev_fd, &statbuf)) + return (-1); + + if (!S_ISSOCK(statbuf.st_mode)) + ev->ev_flags |= EVLIST_NONSOCK; flags = fcntl(ev->ev_fd, F_GETFL); if (flags == -1) return (-1); if (!(flags & O_ASYNC)) { - if (fcntl(ev->ev_fd, F_SETSIG, SIGRTMIN) == -1 - || fcntl(ev->ev_fd, F_SETOWN, (int) getpid()) == -1) + if (fcntl(ev->ev_fd, F_SETSIG, op->signo) == -1 || + fcntl(ev->ev_fd, F_SETOWN, (int) gettid()) == -1) return (-1); - - if (fcntl(ev->ev_fd, F_SETFL, flags | O_ASYNC)) + + /* + * the overhead of always handling writeable edges + * isn't going to be that bad... + */ + if (fcntl(ev->ev_fd, F_SETFL, flags | O_ASYNC|O_RDWR)) return (-1); } #ifdef O_ONESIGFD + /* + * F_SETAUXFL and O_ONESIGFD are defined in a non-standard + * linux kernel patch to coalesce events for fds + */ fcntl(ev->ev_fd, F_SETAUXFL, O_ONESIGFD); #endif + dat = ev2dat(op, ev, 1); + if (dat == NULL) + return (-1); + op->total++; - if (poll_add(op, ev) == -1) - goto err; + if (ev->ev_flags & EVLIST_NONSOCK) + op->nonsock++; + + if (poll_add(op, ev, dat) == -1) { + /* must check the level of new fd's */ + i = errno; + fcntl(ev->ev_fd, F_SETFL, flags); + errno = i; + return (-1); + } return (0); - - err: - i = errno; - fcntl(ev->ev_fd, F_SETFL, flags); - errno = i; - return (-1); } int rtsig_del(void *arg, struct event *ev) { + struct rtdata *dat; struct rtsigop *op = (struct rtsigop *) arg; if (ev->ev_events & EV_SIGNAL) { sigset_t sigs; sigdelset(&op->sigs, EVENT_SIGNAL(ev)); - + sigemptyset(&sigs); sigaddset(&sigs, EVENT_SIGNAL(ev)); return (sigprocmask(SIG_UNBLOCK, &sigs, NULL)); } - if (!(ev->ev_events & (EV_READ | EV_WRITE))) + if (!(ev->ev_events & (EV_READ|EV_WRITE))) return (0); -#ifndef HAVE_WORKING_RTSIG - if (ev->ev_flags & EVLIST_X_NORT) - op->pollmode--; -#endif - poll_remove(op, ev); + dat = ev2dat(op, ev, 0); + poll_remove(op, ev, dat); + dat_del(op, dat); op->total--; + if (ev->ev_flags & EVLIST_NONSOCK) + op->nonsock--; return (0); } @@ -253,7 +738,210 @@ rtsig_del(void *arg, struct event *ev) int rtsig_recalc(struct event_base *base, void *arg, int max) { - return (0); + return (0); +} + +/* + * the following do_X functions implement the different stages of a single + * eventloop pass: poll(), recv(sigsock), sigtimedwait() + * + * do_siginfo_dispatch() is a common factor to both do_sigwait() and + * do_signals_from_socket(). + */ + +static inline int +do_poll(struct rtsigop *op, struct timespec *ts) +{ + int res = 0; + int i = 0; + + if (op->cur > 1) { + /* non-empty poll set (modulo the signalfd) */ + if (op->nonsock) { + int timeout = ts->tv_nsec / 1000000 + ts->tv_sec * 1000; + + sigprocmask(SIG_UNBLOCK, &(op->sigs), NULL); + + res = poll(op->poll, op->cur, timeout); + + sigprocmask(SIG_BLOCK, &(op->sigs), NULL); + + ts->tv_sec = 0; + ts->tv_nsec = 0; + } else { + res = poll(op->poll, op->cur, 0); + } + + if (res < 0) { + return (errno == EINTR ? 0 : -1); + } else if (res) { + ts->tv_sec = 0; + ts->tv_nsec = 0; + } + + i = 0; + while (i < op->cur) { + struct rtdata *dat = op->ptodat[i]; + struct event *ev = dat->ev; + + if (op->poll[i].revents) { + int flags = 0; + + if (op->poll[i].revents & (POLLIN | POLLERR)) + flags |= EV_READ; + + if (op->poll[i].revents & POLLOUT) + flags |= EV_WRITE; + + if (!(ev->ev_events & EV_PERSIST)) { + poll_remove(op, ev, op->ptodat[i]); + event_del(ev); + } else { + i++; + } + + event_active(ev, flags, 1); + } else { + if (ev->ev_flags & (EVLIST_NONSOCK|EVLIST_DONTDEL)) { + i++; + } else { + poll_remove(op, ev, op->ptodat[i]); + } + } + } + } + return (res); +} + +static inline int +do_siginfo_dispatch(struct event_base *base, struct rtsigop *op, + siginfo_t *info) +{ + int signum; + struct rtdata *dat, *next_dat; + struct event *ev, *next_ev; + + if (info == NULL) + return (-1); + + signum = info->si_signo; + if (signum == op->signo) { + int flags, sigok = 0; + flags = 0; + + if (info->si_band & (POLLIN|POLLERR)) + flags |= EV_READ; + if (info->si_band & POLLOUT) + flags |= EV_WRITE; + + if (!flags) + return (0); + + if (info->si_fd > op->highestfd) + return (-1); + + dat = TAILQ_FIRST(op->fdtodat[info->si_fd]); + while (dat != TAILQ_END(op->fdtodat[info->si_fd])) { + next_dat = TAILQ_NEXT(dat, next); + if (flags & dat->ev->ev_events) { + ev = dat->ev; + poll_add(op, ev, dat); + activate(ev, flags & ev->ev_events); + sigok = 1; + } + dat = next_dat; + } + } else if (signum == SIGIO) { + TAILQ_FOREACH(ev, &base->eventqueue, ev_next) { + if (ev->ev_events & (EV_READ|EV_WRITE)) + poll_add(op, ev, NULL); + } + return (1); /* 1 means the caller should poll() again */ + + } else if (sigismember(&op->sigs, signum)) { + /* managed signals are queued */ + ev = TAILQ_FIRST(&signalqueue); + while (ev != TAILQ_END(&signalqueue)) { + next_ev = TAILQ_NEXT(ev, ev_signal_next); + if (EVENT_SIGNAL(ev) == signum) + activate(ev, EV_SIGNAL); + ev = next_ev; + } + } else { + /* dispatch unmanaged signals immediately */ + struct sigaction sa; + if (sigaction(signum, NULL, &sa) == 0) { + if ((sa.sa_flags & SA_SIGINFO) && sa.sa_sigaction) { + (*sa.sa_sigaction)(signum, info, NULL); + } else if (sa.sa_handler) { + if ((int)sa.sa_handler != 1) + (*sa.sa_handler)(signum); + } else { + if (signum != SIGCHLD) { + /* non-blocked SIG_DFL */ + kill(gettid(), signum); + } + } + } + } + + return (0); +} + +/* + * return 1 if we should poll again + * return 0 if we are all set + * return -1 on error + */ +static inline int +do_sigwait(struct event_base *base, struct rtsigop *op, struct timespec *ts, + sigset_t *sigs) +{ + for (;;) { + siginfo_t info; + int signum; + + signum = sigtimedwait(sigs, &info, ts); + + ts->tv_sec = 0; + ts->tv_nsec = 0; + + if (signum == -1) { + if (errno == EAGAIN || errno == EINTR) + return (0); + return (-1); + } else if (1 == do_siginfo_dispatch(base, op, &info)) { + return (1); + } + } + + /* NOTREACHED */ +} + +static inline int +do_signals_from_socket(struct event_base *base, struct rtsigop *op, + struct timespec *ts) +{ + int fd = op->signal_recv_fd; + siginfo_t info; + int res; + + for (;;) { + res = recv(fd, &info, sizeof(info), MSG_NOSIGNAL); + if (res == -1) { + if (errno == EAGAIN) + return (0); + if (errno == EINTR) + continue; + return (-1); + } else { + ts->tv_sec = 0; + ts->tv_nsec = 0; + if (1 == do_siginfo_dispatch(base, op, &info)) + return (1); + } + } + /* NOTREACHED */ } int @@ -261,175 +949,37 @@ rtsig_dispatch(struct event_base *base, void *arg, struct timeval *tv) { struct rtsigop *op = (struct rtsigop *) arg; struct timespec ts; - int res, i; + int res; + sigset_t sigs; - if (op->poll == NULL) - goto retry_poll; -#ifndef HAVE_WORKING_RTSIG - if (op->pollmode) - goto poll_all; -#endif + ts.tv_sec = tv->tv_sec; + ts.tv_nsec = tv->tv_usec * 1000; - if (op->cur) { - ts.tv_sec = ts.tv_nsec = 0; - } else { - ts.tv_sec = tv->tv_sec; - ts.tv_nsec = tv->tv_usec * 1000; - } + poll_for_level: + res = do_poll(op, &ts); /* ts can be modified in do_XXX() */ - for (;;) { - siginfo_t info; - struct event *ev; - int signum; - - signum = sigtimedwait(&op->sigs, &info, &ts); - - if (signum == -1) { - if (errno == EAGAIN) - break; - return (errno == EINTR ? 0 : -1); - } - - ts.tv_sec = ts.tv_nsec = 0; - - if (signum == SIGIO) { -#ifndef HAVE_WORKING_RTSIG - poll_all: -#endif - free(op->poll); - free(op->toev); - retry_poll: - op->cur = 0; - op->max = op->total; - op->poll = malloc(sizeof(*op->poll) * op->total); - if (op->poll == NULL) - return (-1); - op->toev = malloc(sizeof(*op->toev) * op->total); - if (op->toev == NULL) { - free(op->poll); - op->poll = NULL; - return (-1); - } - - TAILQ_FOREACH(ev, &base->eventqueue, ev_next) - if (!(ev->ev_flags & EVLIST_X_NORT)) - poll_add(op, ev); - - break; - } - - if (signum == SIGRTMIN) { - int flags, i, sigok = 0; - - if (info.si_band <= 0) { /* SI_SIGIO */ - flags = EV_READ | EV_WRITE; - } else { - flags = 0; - if (info.si_band & POLLIN) flags |= EV_READ; - if (info.si_band & POLLOUT) flags |= EV_WRITE; - if (!flags) continue; - } - - for (i = 0; flags && i < op->cur; i++) { - ev = op->toev[i]; - - if (ev->ev_fd == info.si_fd) { - flags &= ~ev->ev_events; - sigok = 1; - } - } - - for (ev = TAILQ_FIRST(&base->eventqueue); - flags && ev != TAILQ_END(&base->eventqueue); - ev = TAILQ_NEXT(ev, ev_next)) { - if (ev->ev_fd == info.si_fd) { - if (flags & ev->ev_events) { - i = poll_add(op, ev); - if (i == -1) return -1; - flags &= ~ev->ev_events; - } - sigok = 1; - } - } - - if (!sigok) { - flags = fcntl(info.si_fd, F_GETFL); - if (flags == -1) return -1; - fcntl(info.si_fd, F_SETFL, flags & ~O_ASYNC); - } - } else { - TAILQ_FOREACH(ev, &signalqueue, ev_signal_next) { - if (EVENT_SIGNAL(ev) == signum) - activate(ev, EV_SIGNAL); - } - } - } - - if (!op->cur) - return (0); - - res = poll(op->poll, op->cur, tv->tv_sec * 1000 + - (tv->tv_usec + 999) / 1000); - if (res < 0) + res = do_signals_from_socket(base, op, &ts); + if (res == 1) + goto poll_for_level; + else if (res == -1) return (-1); - i = 0; -#ifdef HAVE_WORKING_RTSIG - while (i < res) { -#else - while (i < op->cur) { -#endif - if (op->poll[i].revents) { - int flags = 0; - struct event *ev = op->toev[i]; + /* + * the mask = managed_signals | unblocked-signals + * MM - if this is not blocking do we need to cast the net this wide? + */ + sigemptyset(&sigs); + sigprocmask(SIG_BLOCK, &sigs, &sigs); + signotset(&sigs); + sigorset(&sigs, &sigs, &op->sigs); - if (op->poll[i].revents & POLLIN) - flags |= EV_READ; - if (op->poll[i].revents & POLLOUT) - flags |= EV_WRITE; + res = do_sigwait(base, op, &ts, &sigs); - if (!(ev->ev_events & EV_PERSIST)) { - event_del(ev); - res--; - } else { - i++; - } - event_active(ev, flags, 1); - } else { -#ifndef HAVE_WORKING_RTSIG - if (op->toev[i]->ev_flags & EVLIST_X_NORT) { - i++; - res++; - continue; - } -#endif - for (;;) { - op->cur--; - if (i == op->cur) - break; - if (op->poll[op->cur].revents) { - memcpy(&op->poll[i], &op->poll[op->cur], sizeof(*op->poll)); - op->toev[i] = op->toev[op->cur]; - break; - } - } - } - } -#ifdef HAVE_WORKING_RTSIG - op->cur = res; -#endif - - if (!op->cur) { - op->max = INIT_MAX; - free(op->poll); - free(op->toev); - /* We just freed it, we shouldn't have a problem getting it back. */ - op->poll = malloc(sizeof(*op->poll) * op->max); - op->toev = malloc(sizeof(*op->toev) * op->max); - - if (op->poll == NULL || op->toev == NULL) - event_err(1, "%s: malloc"); - } + if (res == 1) + goto poll_for_level; + else if (res == -1) + return (-1); return (0); } + diff --git a/test/regress.c b/test/regress.c index f43e70c9..3f515838 100644 --- a/test/regress.c +++ b/test/regress.c @@ -754,16 +754,21 @@ evtag_fuzz() struct timeval tv; int i, j; + int not_failed = 0; for (j = 0; j < 100; j++) { for (i = 0; i < sizeof(buffer); i++) buffer[i] = rand(); evbuffer_drain(tmp, -1); evbuffer_add(tmp, buffer, sizeof(buffer)); - if (evtag_unmarshal_timeval(tmp, 0, &tv) != -1) { - fprintf(stderr, "evtag_unmarshal should have failed"); - exit(1); - } + if (evtag_unmarshal_timeval(tmp, 0, &tv) != -1) + not_failed++; + } + + /* The majority of decodes should fail */ + if (not_failed >= 10) { + fprintf(stderr, "evtag_unmarshal should have failed"); + exit(1); } /* Now insert some corruption into the tag length field */