Edge-Triggered mode for sc_sock (#97)

Added SC_SOCK_EDGE for edge-triggered mode
This commit is contained in:
Sergi Vladykin 2022-09-10 18:39:54 -07:00 committed by GitHub
parent 53d4a9158a
commit 611a97d9d9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 338 additions and 21 deletions

View File

@ -234,6 +234,10 @@ void sc_sock_init(struct sc_sock *s, int type, bool blocking, int family)
s->fdt.type = type; s->fdt.type = type;
s->fdt.op = SC_SOCK_NONE; s->fdt.op = SC_SOCK_NONE;
s->fdt.index = -1; s->fdt.index = -1;
#if defined(_WIN32) || defined(_WIN64)
s->fdt.edge_mask = 0;
#endif
s->blocking = blocking; s->blocking = blocking;
s->family = family; s->family = family;
@ -614,6 +618,10 @@ retry:
} }
if (err == SC_EAGAIN) { if (err == SC_EAGAIN) {
#if defined(_WIN32) || defined(_WIN64)
// Stop masking WRITE event.
s->fdt.edge_mask &= ~SC_SOCK_WRITE;
#endif
errno = EAGAIN; errno = EAGAIN;
return -1; return -1;
} }
@ -645,6 +653,10 @@ retry:
} }
if (err == SC_EAGAIN) { if (err == SC_EAGAIN) {
#if defined(_WIN32) || defined(_WIN64)
// Stop masking READ event.
s->fdt.edge_mask &= ~SC_SOCK_READ;
#endif
errno = EAGAIN; errno = EAGAIN;
return -1; return -1;
} }
@ -1157,7 +1169,7 @@ int sc_sock_poll_add(struct sc_sock_poll *p, struct sc_sock_fd *fdt,
.events = EPOLLERR | EPOLLHUP | EPOLLRDHUP, .events = EPOLLERR | EPOLLHUP | EPOLLRDHUP,
}; };
if ((fdt->op & events) == events) { if (fdt->op == mask) {
return 0; return 0;
} }
@ -1178,6 +1190,10 @@ int sc_sock_poll_add(struct sc_sock_poll *p, struct sc_sock_fd *fdt,
ep_ev.events |= EPOLLOUT; ep_ev.events |= EPOLLOUT;
} }
if (mask & SC_SOCK_EDGE) {
ep_ev.events |= EPOLLET;
}
rc = epoll_ctl(p->fds, op, fdt->fd, &ep_ev); rc = epoll_ctl(p->fds, op, fdt->fd, &ep_ev);
if (rc != 0) { if (rc != 0) {
sc_sock_poll_set_err(p, "epoll_ctl : %s ", strerror(errno)); sc_sock_poll_set_err(p, "epoll_ctl : %s ", strerror(errno));
@ -1204,6 +1220,11 @@ int sc_sock_poll_del(struct sc_sock_poll *p, struct sc_sock_fd *fdt,
} }
fdt->op &= ~events; fdt->op &= ~events;
if (fdt->op == SC_SOCK_EDGE) {
fdt->op = SC_SOCK_NONE;
}
op = fdt->op == SC_SOCK_NONE ? EPOLL_CTL_DEL : EPOLL_CTL_MOD; op = fdt->op == SC_SOCK_NONE ? EPOLL_CTL_DEL : EPOLL_CTL_MOD;
if (fdt->op & SC_SOCK_READ) { if (fdt->op & SC_SOCK_READ) {
@ -1214,6 +1235,10 @@ int sc_sock_poll_del(struct sc_sock_poll *p, struct sc_sock_fd *fdt,
ep_ev.events |= EPOLLOUT; ep_ev.events |= EPOLLOUT;
} }
if (fdt->op & SC_SOCK_EDGE) {
ep_ev.events |= EPOLLET;
}
rc = epoll_ctl(p->fds, op, fdt->fd, &ep_ev); rc = epoll_ctl(p->fds, op, fdt->fd, &ep_ev);
if (rc != 0) { if (rc != 0) {
sc_sock_poll_set_err(p, "epoll_ctl : %s ", strerror(errno)); sc_sock_poll_set_err(p, "epoll_ctl : %s ", strerror(errno));
@ -1355,9 +1380,9 @@ int sc_sock_poll_add(struct sc_sock_poll *p, struct sc_sock_fd *fdt,
{ {
int rc, count = 0; int rc, count = 0;
struct kevent ev[2]; struct kevent ev[2];
int mask = fdt->op | events; enum sc_sock_ev mask = fdt->op | events;
if ((fdt->op & events) == events) { if (fdt->op == mask) {
return 0; return 0;
} }
@ -1368,12 +1393,18 @@ int sc_sock_poll_add(struct sc_sock_poll *p, struct sc_sock_fd *fdt,
} }
} }
unsigned short act = EV_ADD;
if (mask & SC_SOCK_EDGE) {
act |= EV_CLEAR;
}
if (mask & SC_SOCK_WRITE) { if (mask & SC_SOCK_WRITE) {
EV_SET(&ev[count++], fdt->fd, EVFILT_WRITE, EV_ADD, 0, 0, data); EV_SET(&ev[count++], fdt->fd, EVFILT_WRITE, act, 0, 0, data);
} }
if (mask & SC_SOCK_READ) { if (mask & SC_SOCK_READ) {
EV_SET(&ev[count++], fdt->fd, EVFILT_READ, EV_ADD, 0, 0, data); EV_SET(&ev[count++], fdt->fd, EVFILT_READ, act, 0, 0, data);
} }
rc = kevent(p->fds, ev, count, NULL, 0, NULL); rc = kevent(p->fds, ev, count, NULL, 0, NULL);
@ -1391,22 +1422,24 @@ int sc_sock_poll_add(struct sc_sock_poll *p, struct sc_sock_fd *fdt,
int sc_sock_poll_del(struct sc_sock_poll *p, struct sc_sock_fd *fdt, int sc_sock_poll_del(struct sc_sock_poll *p, struct sc_sock_fd *fdt,
enum sc_sock_ev events, void *data) enum sc_sock_ev events, void *data)
{ {
(void) data;
int rc, count = 0; int rc, count = 0;
struct kevent ev[2]; struct kevent ev[2];
int mask = fdt->op & events; enum sc_sock_ev mask = fdt->op & events;
if (mask == 0) { if (mask == 0) {
return 0; return 0;
} }
if (mask & SC_SOCK_READ) { if (mask & SC_SOCK_READ) {
EV_SET(&ev[count++], fdt->fd, EVFILT_READ, EV_DELETE, 0, 0, 0); EV_SET(&ev[count++], fdt->fd, EVFILT_READ, EV_DELETE, 0, 0, NULL);
} else if ((mask & SC_SOCK_EDGE) != 0 && (fdt->op & SC_SOCK_READ) != 0) {
EV_SET(&ev[count++], fdt->fd, EVFILT_READ, EV_ADD, 0, 0, data);
} }
if (mask & SC_SOCK_WRITE) { if (mask & SC_SOCK_WRITE) {
EV_SET(&ev[count++], fdt->fd, EVFILT_WRITE, EV_DELETE, 0, 0, 0); EV_SET(&ev[count++], fdt->fd, EVFILT_WRITE, EV_DELETE, 0, 0, NULL);
} else if ((mask & SC_SOCK_EDGE) != 0 && (fdt->op & SC_SOCK_WRITE) != 0) {
EV_SET(&ev[count++], fdt->fd, EVFILT_WRITE, EV_ADD, 0, 0, data);
} }
rc = kevent(p->fds, ev, count, NULL, 0, NULL); rc = kevent(p->fds, ev, count, NULL, 0, NULL);
@ -1416,6 +1449,11 @@ int sc_sock_poll_del(struct sc_sock_poll *p, struct sc_sock_fd *fdt,
} }
fdt->op &= ~events; fdt->op &= ~events;
if (fdt->op == SC_SOCK_EDGE) {
fdt->op = SC_SOCK_NONE;
}
p->count -= fdt->op == SC_SOCK_NONE; p->count -= fdt->op == SC_SOCK_NONE;
return 0; return 0;
@ -1472,7 +1510,7 @@ int sc_sock_poll_init(struct sc_sock_poll *p)
goto err; goto err;
} }
p->data = sc_sock_malloc(sizeof(void *) * 16); p->data = sc_sock_malloc(sizeof(*p->data) * 16);
if (p->data == NULL) { if (p->data == NULL) {
goto err; goto err;
} }
@ -1513,7 +1551,7 @@ int sc_sock_poll_term(struct sc_sock_poll *p)
static int sc_sock_poll_expand(struct sc_sock_poll *p) static int sc_sock_poll_expand(struct sc_sock_poll *p)
{ {
int cap, rc = 0; int cap, rc = 0;
void **data = NULL; struct sc_sock_fd_data *data = NULL;
struct pollfd *ev = NULL; struct pollfd *ev = NULL;
if (p->count == p->cap) { if (p->count == p->cap) {
@ -1555,8 +1593,9 @@ int sc_sock_poll_add(struct sc_sock_poll *p, struct sc_sock_fd *fdt,
{ {
int rc; int rc;
int index = fdt->index; int index = fdt->index;
enum sc_sock_ev mask = fdt->op | events;
if ((fdt->op & events) == events) { if (fdt->op == mask) {
return 0; return 0;
} }
@ -1579,25 +1618,30 @@ int sc_sock_poll_add(struct sc_sock_poll *p, struct sc_sock_fd *fdt,
assert(index != -1); assert(index != -1);
p->events[index].fd = fdt->fd; p->events[index].fd = fdt->fd;
p->data[index].fdt = fdt;
fdt->index = index; fdt->index = index;
} }
assert(index != -1); assert(index != -1);
fdt->op |= events; fdt->op = mask;
p->events[fdt->index].events = 0; p->events[fdt->index].events = 0;
p->events[fdt->index].revents = 0; p->events[fdt->index].revents = 0;
if (events & SC_SOCK_READ) { if (mask & SC_SOCK_READ) {
p->events[fdt->index].events |= POLLIN; p->events[fdt->index].events |= POLLIN;
} }
if (events & SC_SOCK_WRITE) { if (mask & SC_SOCK_WRITE) {
p->events[fdt->index].events |= POLLOUT; p->events[fdt->index].events |= POLLOUT;
} }
p->data[fdt->index] = data; if (mask & SC_SOCK_EDGE) {
fdt->edge_mask |= SC_SOCK_EDGE;
}
p->data[fdt->index].data = data;
return 0; return 0;
} }
@ -1610,10 +1654,16 @@ int sc_sock_poll_del(struct sc_sock_poll *p, struct sc_sock_fd *fdt,
} }
fdt->op &= ~events; fdt->op &= ~events;
if (fdt->op == SC_SOCK_EDGE) {
fdt->op = SC_SOCK_NONE;
}
if (fdt->op == SC_SOCK_NONE) { if (fdt->op == SC_SOCK_NONE) {
p->events[fdt->index].fd = SC_INVALID; p->events[fdt->index].fd = SC_INVALID;
p->count--; p->count--;
fdt->index = -1; fdt->index = -1;
fdt->edge_mask = 0;
} else { } else {
p->events[fdt->index].events = 0; p->events[fdt->index].events = 0;
@ -1625,7 +1675,11 @@ int sc_sock_poll_del(struct sc_sock_poll *p, struct sc_sock_fd *fdt,
p->events[fdt->index].events |= POLLOUT; p->events[fdt->index].events |= POLLOUT;
} }
p->data[fdt->index] = data; if ((fdt->op & SC_SOCK_EDGE) == 0) {
fdt->edge_mask = 0;
}
p->data[fdt->index].data = data;
} }
return 0; return 0;
@ -1633,11 +1687,15 @@ int sc_sock_poll_del(struct sc_sock_poll *p, struct sc_sock_fd *fdt,
void *sc_sock_poll_data(struct sc_sock_poll *p, int i) void *sc_sock_poll_data(struct sc_sock_poll *p, int i)
{ {
return p->data[i]; return p->data[i].data;
} }
uint32_t sc_sock_poll_event(struct sc_sock_poll *p, int i) uint32_t sc_sock_poll_event(struct sc_sock_poll *p, int i)
{ {
if (p->events[i].fd == SC_INVALID) {
return SC_SOCK_NONE;
}
uint32_t evs = 0; uint32_t evs = 0;
uint32_t poll_evs = p->events[i].revents; uint32_t poll_evs = p->events[i].revents;
@ -1649,6 +1707,15 @@ uint32_t sc_sock_poll_event(struct sc_sock_poll *p, int i)
evs |= SC_SOCK_WRITE; evs |= SC_SOCK_WRITE;
} }
// Start masking fired events in Edge-Triggered mode.
uint32_t *mask_ptr = &p->data[i].fdt->edge_mask;
uint32_t mask = *mask_ptr;
if (mask & SC_SOCK_EDGE) {
*mask_ptr |= evs;
evs &= ~mask;
}
poll_evs &= POLLHUP | POLLERR; poll_evs &= POLLHUP | POLLERR;
if (poll_evs != 0) { if (poll_evs != 0) {
evs = (SC_SOCK_READ | SC_SOCK_WRITE); evs = (SC_SOCK_READ | SC_SOCK_WRITE);

View File

@ -65,6 +65,7 @@ enum sc_sock_ev
SC_SOCK_NONE = 0u, SC_SOCK_NONE = 0u,
SC_SOCK_READ = 1u, SC_SOCK_READ = 1u,
SC_SOCK_WRITE = 2u, SC_SOCK_WRITE = 2u,
SC_SOCK_EDGE = 4u,
}; };
enum sc_sock_family enum sc_sock_family
@ -79,6 +80,9 @@ struct sc_sock_fd {
enum sc_sock_ev op; enum sc_sock_ev op;
int type; // user data int type; // user data
int index; int index;
#if defined(_WIN32) || defined(_WIN64)
uint32_t edge_mask;
#endif
}; };
struct sc_sock { struct sc_sock {
@ -335,10 +339,15 @@ struct sc_sock_poll {
#include <poll.h> #include <poll.h>
#endif #endif
struct sc_sock_fd_data {
struct sc_sock_fd *fdt;
void *data;
};
struct sc_sock_poll { struct sc_sock_poll {
int count; int count;
int cap; int cap;
void **data; struct sc_sock_fd_data *data;
struct pollfd *events; struct pollfd *events;
char err[128]; char err[128];
}; };
@ -369,6 +378,7 @@ int sc_sock_poll_term(struct sc_sock_poll *p);
* @param p poll * @param p poll
* @param fdt fdt * @param fdt fdt
* @param events SC_SOCK_READ, SC_SOCK_WRITE or SC_SOCK_READ | SC_SOCK_WRITE * @param events SC_SOCK_READ, SC_SOCK_WRITE or SC_SOCK_READ | SC_SOCK_WRITE
* SC_SOCK_EDGE for edge-triggerred mode
* @param data user data * @param data user data
* @return '0' on success, negative number on failure, * @return '0' on success, negative number on failure,
* call sc_sock_poll_err() to get error string * call sc_sock_poll_err() to get error string
@ -381,6 +391,7 @@ int sc_sock_poll_add(struct sc_sock_poll *p, struct sc_sock_fd *fdt,
* @param p poll * @param p poll
* @param fdt fdt * @param fdt fdt
* @param events SC_SOCK_READ, SC_SOCK_WRITE or SC_SOCK_READ | SC_SOCK_WRITE * @param events SC_SOCK_READ, SC_SOCK_WRITE or SC_SOCK_READ | SC_SOCK_WRITE
* SC_SOCK_EDGE to cancel edge-triggerred mode
* @param data user data * @param data user data
* @return '0' on success, negative number on failure, * @return '0' on success, negative number on failure,
* call sc_sock_poll_err() to get error string * call sc_sock_poll_err() to get error string

View File

@ -31,6 +31,7 @@ struct sc_thread {
#include <stdarg.h> #include <stdarg.h>
#include <stdlib.h> #include <stdlib.h>
#include <string.h> #include <string.h>
#include <time.h>
#include <unistd.h> #include <unistd.h>
struct sc_thread { struct sc_thread {
@ -151,6 +152,28 @@ int sc_thread_term(struct sc_thread *thread)
return sc_thread_join(thread, NULL); return sc_thread_join(thread, NULL);
} }
int sc_time_sleep(uint64_t millis)
{
#if defined(_WIN32) || defined(_WIN64)
Sleep((DWORD) millis);
return 0;
#else
int rc;
struct timespec t, rem;
rem.tv_sec = (time_t) millis / 1000;
rem.tv_nsec = (long) (millis % 1000) * 1000000;
do {
t = rem;
rc = nanosleep(&t, &rem);
} while (rc != 0 && errno == EINTR);
return rc;
#endif
}
void *server_ip4(void *arg) void *server_ip4(void *arg)
{ {
(void) arg; (void) arg;
@ -1447,7 +1470,7 @@ void *server(void *arg)
while (!done) { while (!done) {
int count = sc_sock_poll_wait(&p, -1); int count = sc_sock_poll_wait(&p, -1);
assert(rc != -1); assert(count != -1);
for (int i = 0; i < count; i++) { for (int i = 0; i < count; i++) {
ev = sc_sock_poll_event(&p, i); ev = sc_sock_poll_event(&p, i);
@ -1568,6 +1591,221 @@ void test_poll(void)
assert(sc_thread_term(&thread2) == 0); assert(sc_thread_term(&thread2) == 0);
} }
void check_poll_empty(struct sc_sock_poll *p, int timeout)
{
sc_time_sleep(50);
int count = sc_sock_poll_wait(p, timeout);
assert(count >= 0);
for (int i = 0; i < count; i++) {
assert(0 == sc_sock_poll_event(p, i));
}
}
void test_poll_edge(void)
{
uint32_t ev;
int rc, count, found, timeout = 1;
struct sc_sock srv, clt, acc, *sock;
struct sc_sock_poll p;
rc = sc_sock_poll_init(&p);
assert(rc == 0);
sc_sock_init(&srv, 0, false, SC_SOCK_INET);
rc = sc_sock_listen(&srv, "127.0.0.1", "11000");
assert(rc == 0);
rc = sc_sock_poll_add(&p, &srv.fdt, SC_SOCK_READ, &srv);
assert(rc == 0);
check_poll_empty(&p, timeout);
sc_sock_init(&clt, 0, false, SC_SOCK_INET);
rc = sc_sock_connect(&clt, "127.0.0.1", "11000", NULL, NULL);
if (rc == -1) {
assert(errno == EAGAIN);
} else {
assert(rc == 0);
}
rc = sc_sock_poll_add(&p, &clt.fdt,
SC_SOCK_READ | SC_SOCK_WRITE | SC_SOCK_EDGE,
&clt);
assert(rc == 0);
sc_time_sleep(50);
count = sc_sock_poll_wait(&p, timeout);
assert(count >= 2);
found = 0;
for (int i = 0; i < count; i++) {
ev = sc_sock_poll_event(&p, i);
sock = sc_sock_poll_data(&p, i);
if (ev == 0) {
continue;
}
if (sock == &srv) {
if (ev & SC_SOCK_READ) {
rc = sc_sock_accept(&srv, &acc);
assert(rc == 0);
}
if (ev & SC_SOCK_WRITE) {
assert(false);
}
found++;
} else if (sock == &clt) {
if (ev & SC_SOCK_WRITE) {
rc = sc_sock_finish_connect(&clt);
assert(rc == 0);
}
if (ev & SC_SOCK_READ) {
assert(false);
}
found++;
} else {
assert(false);
}
}
assert(found == 2);
rc = sc_sock_poll_add(&p, &acc.fdt,
SC_SOCK_READ | SC_SOCK_WRITE | SC_SOCK_EDGE,
&acc);
assert(rc == 0);
sc_time_sleep(50);
count = sc_sock_poll_wait(&p, timeout);
assert(count >= 1);
found = 0;
for (int i = 0; i < count; i++) {
ev = sc_sock_poll_event(&p, i);
sock = sc_sock_poll_data(&p, i);
if (ev == 0) {
continue;
}
assert(sock == &acc);
assert(ev == SC_SOCK_WRITE);
found++;
}
assert(found == 1);
check_poll_empty(&p, timeout);
int total_w = 0;
int total_r = 0;
for (;;) {
int w = sc_sock_send(&acc, "blaBLA", 7, 0);
if (w < 0) {
assert(errno == EAGAIN);
break;
}
total_w += w;
}
do {
sc_time_sleep(50);
count = sc_sock_poll_wait(&p, timeout);
assert(count >= 1);
found = 0;
for (int i = 0; i < count; i++) {
ev = sc_sock_poll_event(&p, i);
sock = sc_sock_poll_data(&p, i);
if (ev == 0) {
continue;
}
if (sock == &clt) {
assert(ev & SC_SOCK_READ);
found++;
} else if (sock == &acc) {
assert(ev & SC_SOCK_WRITE);
found++;
} else {
assert(false);
}
}
assert(found == 1 || found == 2);
char rb;
for (;;) {
int r = sc_sock_recv(&clt, &rb, 1, 0);
if (r < 0) {
assert(errno == EAGAIN);
break;
}
total_r += r;
}
} while (total_r < total_w);
assert(total_r == total_w);
sc_time_sleep(50);
count = sc_sock_poll_wait(&p, timeout);
assert(count >= 1);
found = 0;
for (int i = 0; i < count; i++) {
ev = sc_sock_poll_event(&p, i);
sock = sc_sock_poll_data(&p, i);
if (ev == 0) {
continue;
}
assert(sock == &clt || sock == &acc);
assert(ev == SC_SOCK_WRITE);
found++;
}
assert(found == 1 || found == 2);
check_poll_empty(&p, timeout);
assert(srv.fdt.op == SC_SOCK_READ);
assert(sc_sock_poll_del(&p, &srv.fdt, SC_SOCK_READ, NULL) == 0);
assert(srv.fdt.op == SC_SOCK_NONE);
assert(acc.fdt.op == (SC_SOCK_WRITE | SC_SOCK_READ | SC_SOCK_EDGE));
assert(sc_sock_poll_del(&p, &acc.fdt, SC_SOCK_READ | SC_SOCK_EDGE,
NULL) == 0);
assert(acc.fdt.op == SC_SOCK_WRITE);
assert(sc_sock_poll_add(&p, &acc.fdt, SC_SOCK_EDGE, NULL) == 0);
assert(acc.fdt.op == (SC_SOCK_WRITE | SC_SOCK_EDGE));
assert(sc_sock_poll_del(&p, &acc.fdt, SC_SOCK_WRITE, NULL) == 0);
assert(acc.fdt.op == SC_SOCK_NONE);
assert(clt.fdt.op == (SC_SOCK_READ | SC_SOCK_WRITE | SC_SOCK_EDGE));
assert(sc_sock_poll_del(&p, &clt.fdt, SC_SOCK_EDGE, NULL) == 0);
assert(clt.fdt.op == (SC_SOCK_READ | SC_SOCK_WRITE));
assert(sc_sock_poll_del(&p, &clt.fdt, SC_SOCK_READ | SC_SOCK_WRITE,
NULL) == 0);
assert(clt.fdt.op == SC_SOCK_NONE);
assert(sc_sock_poll_add(&p, &acc.fdt, SC_SOCK_READ, NULL) == 0);
assert(sc_sock_poll_add(&p, &acc.fdt, SC_SOCK_WRITE, NULL) == 0);
assert(sc_sock_poll_add(&p, &acc.fdt, SC_SOCK_EDGE, NULL) == 0);
assert(acc.fdt.op == (SC_SOCK_READ | SC_SOCK_WRITE | SC_SOCK_EDGE));
assert(sc_sock_poll_del(&p, &acc.fdt, SC_SOCK_READ, NULL) == 0);
assert(acc.fdt.op == (SC_SOCK_WRITE | SC_SOCK_EDGE));
assert(sc_sock_term(&srv) == 0);
assert(sc_sock_term(&acc) == 0);
assert(sc_sock_term(&clt) == 0);
assert(sc_sock_poll_term(&p) == 0);
}
void test_err(void) void test_err(void)
{ {
struct sc_sock sock; struct sc_sock sock;
@ -1634,6 +1872,7 @@ int main(void)
test_poll(); test_poll();
test_err(); test_err();
test_poll_mass(); test_poll_mass();
test_poll_edge();
assert(sc_sock_cleanup() == 0); assert(sc_sock_cleanup() == 0);