1 /** 2 Linux epoll based event driver implementation. 3 4 Epoll is an efficient API for asynchronous I/O on Linux, suitable for large 5 numbers of concurrently open sockets. 6 */ 7 module eventcore.drivers.posix.epoll; 8 @safe @nogc nothrow: 9 10 version (linux): 11 12 public import eventcore.drivers.posix.driver; 13 import eventcore.internal.utils; 14 15 import core.time : Duration; 16 import core.sys.posix.sys.time : timeval; 17 import core.sys.linux.epoll; 18 19 alias EpollEventDriver = PosixEventDriver!EpollEventLoop; 20 21 static if (!is(typeof(SOCK_CLOEXEC))) 22 enum SOCK_CLOEXEC = 0x80000; 23 24 final class EpollEventLoop : PosixEventLoop { 25 @safe nothrow: 26 27 private { 28 int m_epoll; 29 epoll_event[100] m_events; 30 } 31 32 this() 33 @nogc { 34 assumeSafeNoGC({ 35 m_epoll = epoll_create1(SOCK_CLOEXEC); 36 }); 37 } 38 39 override bool doProcessEvents(Duration timeout) 40 @trusted { 41 import std.algorithm : max; 42 //assert(Fiber.getThis() is null, "processEvents may not be called from within a fiber!"); 43 44 debug (EventCoreEpollDebug) print("Epoll wait %s, %s", m_events.length, timeout); 45 long tomsec; 46 if (timeout == Duration.max) tomsec = long.max; 47 else tomsec = max((timeout.total!"hnsecs" + 9999) / 10_000, 0); 48 auto ret = epoll_wait(m_epoll, m_events.ptr, cast(int)m_events.length, tomsec > int.max ? -1 : cast(int)tomsec); 49 debug (EventCoreEpollDebug) print("Epoll wait done: %s", ret); 50 51 if (ret > 0) { 52 foreach (ref evt; m_events[0 .. ret]) { 53 debug (EventCoreEpollDebug) print("Epoll event on %s: %s", evt.data.fd, evt.events); 54 auto fd = cast(size_t)evt.data.fd; 55 if (evt.events & (EPOLLERR|EPOLLHUP|EPOLLRDHUP)) notify!(EventType.status)(fd); 56 if (evt.events & EPOLLIN) notify!(EventType.read)(fd); 57 if (evt.events & EPOLLOUT) notify!(EventType.write)(fd); 58 } 59 return true; 60 } else { 61 // NOTE: In particular, EINTR needs to cause true to be returned 62 // here, so that user code has a chance to handle any effects 63 // of the signal handler before waiting again. 64 // 65 // Other errors are very likely to to reoccur for the next 66 // loop iteration, so there is no value in attempting to 67 // wait again. 68 return ret < 0; 69 } 70 } 71 72 override void dispose() 73 @nogc { 74 import core.sys.posix.unistd : close; 75 super.dispose(); 76 close(m_epoll); 77 } 78 79 override void registerFD(FD fd, EventMask mask, bool edge_triggered = true) 80 { 81 debug (EventCoreEpollDebug) print("Epoll register FD %s: %s", fd, mask); 82 epoll_event ev; 83 if (edge_triggered) ev.events |= EPOLLET; 84 if (mask & EventMask.read) ev.events |= EPOLLIN; 85 if (mask & EventMask.write) ev.events |= EPOLLOUT; 86 if (mask & EventMask.status) ev.events |= EPOLLERR|EPOLLHUP|EPOLLRDHUP; 87 ev.data.fd = cast(int)fd; 88 assumeSafeNoGC({ 89 epoll_ctl(m_epoll, EPOLL_CTL_ADD, cast(int)fd, &ev); 90 }); 91 } 92 93 override void unregisterFD(FD fd, EventMask mask) 94 { 95 debug (EventCoreEpollDebug) print("Epoll unregister FD %s", fd); 96 assumeSafeNoGC({ 97 epoll_ctl(m_epoll, EPOLL_CTL_DEL, cast(int)fd, null); 98 }); 99 } 100 101 override void updateFD(FD fd, EventMask old_mask, EventMask mask, bool edge_triggered = true) 102 { 103 debug (EventCoreEpollDebug) print("Epoll update FD %s: %s", fd, mask); 104 epoll_event ev; 105 if (edge_triggered) ev.events |= EPOLLET; 106 //ev.events = EPOLLONESHOT; 107 if (mask & EventMask.read) ev.events |= EPOLLIN; 108 if (mask & EventMask.write) ev.events |= EPOLLOUT; 109 if (mask & EventMask.status) ev.events |= EPOLLERR|EPOLLHUP|EPOLLRDHUP; 110 ev.data.fd = cast(int)fd; 111 assumeSafeNoGC({ 112 epoll_ctl(m_epoll, EPOLL_CTL_MOD, cast(int)fd, &ev); 113 }); 114 } 115 } 116 117 private timeval toTimeVal(Duration dur) 118 { 119 timeval tvdur; 120 dur.split!("seconds", "usecs")(tvdur.tv_sec, tvdur.tv_usec); 121 return tvdur; 122 } 123 124 private void assumeSafeNoGC(scope void delegate() nothrow doit) 125 @trusted { 126 (cast(void delegate() nothrow @nogc)doit)(); 127 }