1 /** 2 BSD kqueue based event driver implementation. 3 4 Kqueue is an efficient API for asynchronous I/O on BSD flavors, including 5 OS X/macOS, suitable for large numbers of concurrently open sockets. 6 */ 7 module eventcore.drivers.posix.kqueue; 8 @safe: /*@nogc:*/ nothrow: 9 10 version (FreeBSD) enum have_kqueue = true; 11 else version (DragonFlyBSD) enum have_kqueue = true; 12 else version (OSX) enum have_kqueue = true; 13 else enum have_kqueue = false; 14 15 static if (have_kqueue): 16 17 public import eventcore.drivers.posix.driver; 18 import eventcore.internal.utils; 19 20 import core.time : Duration; 21 import core.sys.posix.sys.time : timespec, time_t; 22 23 version (OSX) import core.sys.darwin.sys.event; 24 else version (FreeBSD) import core.sys.freebsd.sys.event; 25 else version (DragonFlyBSD) import core.sys.dragonflybsd.sys.event; 26 else static assert(false, "Kqueue not supported on this OS."); 27 28 29 import core.sys.linux.epoll; 30 31 32 alias KqueueEventDriver = PosixEventDriver!KqueueEventLoop; 33 34 final class KqueueEventLoop : KqueueEventLoopBase { 35 override bool doProcessEvents(Duration timeout) 36 @trusted { 37 return doProcessEventsBase(timeout); 38 } 39 } 40 41 42 abstract class KqueueEventLoopBase : PosixEventLoop { 43 protected { 44 int m_queue; 45 size_t m_changeCount = 0; 46 kevent_t[100] m_changes; 47 kevent_t[100] m_events; 48 } 49 50 this() 51 @safe nothrow @nogc { 52 m_queue = () @trusted { return kqueue(); } (); 53 assert(m_queue >= 0, "Failed to create kqueue."); 54 } 55 56 protected bool doProcessEventsBase(Duration timeout) 57 @trusted nothrow { 58 import std.algorithm : min; 59 //assert(Fiber.getThis() is null, "processEvents may not be called from within a fiber!"); 60 61 //print("wait %s", m_events.length); 62 timespec ts; 63 long secs, hnsecs; 64 timeout.split!("seconds", "hnsecs")(secs, hnsecs); 65 ts.tv_sec = cast(time_t)secs; 66 ts.tv_nsec = cast(uint)hnsecs * 100; 67 68 auto ret = kevent(m_queue, m_changes.ptr, cast(int)m_changeCount, m_events.ptr, cast(int)m_events.length, timeout == Duration.max ? null : &ts); 69 m_changeCount = 0; 70 71 //print("kevent returned %s", ret); 72 73 if (ret > 0) { 74 foreach (ref evt; m_events[0 .. ret]) { 75 //print("event %s %s", evt.ident, evt.filter, evt.flags); 76 assert(evt.ident <= uint.max); 77 auto fd = cast(size_t)evt.ident; 78 if (evt.flags & (EV_EOF|EV_ERROR)) 79 notify!(EventType.status)(fd); 80 switch (evt.filter) { 81 default: break; 82 case EVFILT_READ: notify!(EventType.read)(fd); break; 83 case EVFILT_WRITE: notify!(EventType.write)(fd); break; 84 } 85 // EV_SIGNAL, EV_TIMEOUT 86 } 87 return true; 88 } else { 89 // NOTE: In particular, EINTR needs to cause true to be returned 90 // here, so that user code has a chance to handle any effects 91 // of the signal handler before waiting again. 92 // 93 // Other errors are very likely to to reoccur for the next 94 // loop iteration, so there is no value in attempting to 95 // wait again. 96 return ret < 0; 97 } 98 } 99 100 override void dispose() 101 { 102 super.dispose(); 103 import core.sys.posix.unistd : close; 104 close(m_queue); 105 } 106 107 override void registerFD(FD fd, EventMask mask, bool edge_triggered = true) 108 { 109 //print("register %s %s", fd, mask); 110 kevent_t ev; 111 ev.ident = fd; 112 ev.flags = EV_ADD|EV_ENABLE; 113 if (edge_triggered) ev.flags |= EV_CLEAR; 114 if (mask & EventMask.read) { 115 ev.filter = EVFILT_READ; 116 putChange(ev); 117 } 118 if (mask & EventMask.write) { 119 ev.filter = EVFILT_WRITE; 120 putChange(ev); 121 } 122 //if (mask & EventMask.status) ev.events |= EPOLLERR|EPOLLRDHUP; 123 } 124 125 override void unregisterFD(FD fd, EventMask mask) 126 { 127 kevent_t ev; 128 ev.ident = fd; 129 ev.flags = EV_DELETE; 130 putChange(ev); 131 } 132 133 override void updateFD(FD fd, EventMask old_mask, EventMask new_mask, bool edge_triggered = true) 134 { 135 //print("update %s %s", fd, mask); 136 kevent_t ev; 137 auto changes = old_mask ^ new_mask; 138 139 if (changes & EventMask.read) { 140 ev.filter = EVFILT_READ; 141 ev.flags = new_mask & EventMask.read ? EV_ADD : EV_DELETE; 142 if (edge_triggered) ev.flags |= EV_CLEAR; 143 putChange(ev); 144 } 145 146 if (changes & EventMask.write) { 147 ev.filter = EVFILT_WRITE; 148 ev.flags = new_mask & EventMask.write ? EV_ADD : EV_DELETE; 149 if (edge_triggered) ev.flags |= EV_CLEAR; 150 putChange(ev); 151 } 152 153 //if (mask & EventMask.status) ev.events |= EPOLLERR|EPOLLRDHUP; 154 } 155 156 private void putChange(ref kevent_t ev) 157 @safe nothrow @nogc { 158 m_changes[m_changeCount++] = ev; 159 if (m_changeCount == m_changes.length) { 160 auto ret = (() @trusted => kevent(m_queue, &m_changes[0], cast(int)m_changes.length, null, 0, null)) (); 161 assert(ret == 0); 162 m_changeCount = 0; 163 } 164 } 165 }