1 module eventcore.drivers.posix.events; 2 @safe: 3 4 import eventcore.driver; 5 import eventcore.drivers.posix.driver; 6 import eventcore.internal.consumablequeue : ConsumableQueue; 7 import eventcore.internal.utils : nogc_assert, mallocT, freeT; 8 9 10 version (linux) { 11 nothrow @nogc extern (C) int eventfd(uint initval, int flags); 12 import core.sys.linux.sys.eventfd : EFD_NONBLOCK, EFD_CLOEXEC; 13 } 14 version (Posix) { 15 import core.sys.posix.unistd : close, read, write; 16 } else { 17 import core.sys.windows.winsock2 : closesocket, AF_INET, SOCKET, SOCK_DGRAM, 18 bind, connect, getsockname, send, socket; 19 } 20 21 22 final class PosixEventDriverEvents(Loop : PosixEventLoop, Sockets : EventDriverSockets) : EventDriverEvents { 23 @safe: /*@nogc:*/ nothrow: 24 private { 25 Loop m_loop; 26 Sockets m_sockets; 27 ubyte[ulong.sizeof] m_buf; 28 } 29 30 this(Loop loop, Sockets sockets) 31 @nogc { 32 m_loop = loop; 33 m_sockets = sockets; 34 } 35 36 package @property Loop loop() { return m_loop; } 37 38 final override EventID create() 39 { 40 return createInternal(false); 41 } 42 43 package(eventcore) EventID createInternal(bool is_internal = true) 44 @nogc { 45 version (linux) { 46 auto eid = eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC); 47 if (eid == -1) return EventID.invalid; 48 // FIXME: avoid dynamic memory allocation for the queue 49 auto id = m_loop.initFD!EventID(eid, FDFlags.internal, 50 EventSlot(mallocT!(ConsumableQueue!EventCallback), is_internal)); 51 m_loop.registerFD(id, EventMask.read); 52 m_loop.setNotifyCallback!(EventType.read)(id, &onEvent); 53 releaseRef(id); // setNotifyCallback increments the reference count, but we need a value of 1 upon return 54 assert(getRC(id) == 1); 55 return id; 56 } else { 57 sock_t[2] fd; 58 version (Posix) { 59 import core.sys.posix.fcntl : fcntl, F_SETFL; 60 import eventcore.drivers.posix.sockets : O_CLOEXEC; 61 62 // create a pair of sockets to communicate between threads 63 import core.sys.posix.sys.socket : SOCK_DGRAM, AF_UNIX, socketpair; 64 if (() @trusted { return socketpair(AF_UNIX, SOCK_DGRAM, 0, fd); } () != 0) 65 return EventID.invalid; 66 67 assert(fd[0] != fd[1]); 68 69 // use the first socket as the async receiver 70 auto s = m_sockets.adoptDatagramSocketInternal(fd[0], true, true); 71 72 () @trusted { fcntl(fd[1], F_SETFL, O_CLOEXEC); } (); 73 } else { 74 // fake missing socketpair support on Windows 75 import std.socket : InternetAddress; 76 scope addr = new InternetAddress(0x7F000001, 0); 77 auto s = m_sockets.createDatagramSocketInternal(addr, null, DatagramCreateOptions.none, true); 78 if (s == DatagramSocketFD.invalid) return EventID.invalid; 79 fd[0] = cast(sock_t)s; 80 if (!() @trusted { 81 fd[1] = socket(AF_INET, SOCK_DGRAM, 0); 82 int nl = addr.nameLen; 83 import eventcore.internal.utils : print; 84 if (bind(fd[1], addr.name, addr.nameLen) != 0) 85 return false; 86 assert(nl == addr.nameLen); 87 if (getsockname(fd[0], addr.name, &nl) != 0) 88 return false; 89 if (connect(fd[1], addr.name, addr.nameLen) != 0) 90 return false; 91 return true; 92 } ()) 93 { 94 m_sockets.releaseRef(s); 95 return EventID.invalid; 96 } 97 } 98 99 m_sockets.receiveNoGC(s, m_buf, IOMode.once, &onSocketData); 100 101 // use the second socket as the event ID and as the sending end for 102 // other threads 103 // FIXME: avoid dynamic memory allocation for the queue 104 auto id = m_loop.initFD!EventID(fd[1], FDFlags.internal, 105 EventSlot(mallocT!(ConsumableQueue!EventCallback), is_internal, s)); 106 assert(getRC(id) == 1); 107 try m_sockets.userData!EventID(s) = id; 108 catch (Exception e) assert(false, e.msg); 109 return id; 110 } 111 } 112 113 final override void trigger(EventID event, bool notify_all) 114 { 115 if (!isValid(event)) return; 116 117 // make sure the event stays alive until all waiters have been notified. 118 addRef(event); 119 scope (exit) releaseRef(event); 120 121 auto slot = getSlot(event); 122 if (notify_all) { 123 //log("emitting only for this thread (%s waiters)", m_fds[event].waiters.length); 124 foreach (w; slot.waiters.consume) { 125 //log("emitting waiter %s %s", cast(void*)w.funcptr, w.ptr); 126 if (!isInternal(event)) m_loop.m_waiterCount--; 127 w(event); 128 } 129 } else { 130 if (!slot.waiters.empty) { 131 if (!isInternal(event)) m_loop.m_waiterCount--; 132 slot.waiters.consumeOne()(event); 133 } 134 } 135 } 136 137 final override void trigger(EventID event, bool notify_all) 138 shared @trusted @nogc { 139 import core.atomic : atomicStore; 140 long count = notify_all ? long.max : 1; 141 //log("emitting for all threads"); 142 version (Posix) .write(cast(int)event, &count, count.sizeof); 143 else assert(send(cast(int)event, cast(const(ubyte*))&count, count.sizeof, 0) == count.sizeof); 144 } 145 146 final override void wait(EventID event, EventCallback on_event) 147 @nogc { 148 if (!isValid(event)) return; 149 150 if (!isInternal(event)) m_loop.m_waiterCount++; 151 getSlot(event).waiters.put(on_event); 152 } 153 154 final override void cancelWait(EventID event, EventCallback on_event) 155 { 156 import std.algorithm.searching : countUntil; 157 import std.algorithm.mutation : remove; 158 159 if (!isValid(event)) return; 160 161 if (!isInternal(event)) m_loop.m_waiterCount--; 162 getSlot(event).waiters.removePending(on_event); 163 } 164 165 version (linux) { 166 private void onEvent(FD fd) 167 @trusted { 168 EventID event = cast(EventID)fd; 169 ulong cnt; 170 () @trusted { .read(cast(int)event, &cnt, cnt.sizeof); } (); 171 trigger(event, cnt > 0); 172 } 173 } else { 174 private void onSocketData(DatagramSocketFD s, IOStatus st, size_t, scope RefAddress) 175 @nogc { 176 // avoid infinite recursion in case of errors 177 if (st == IOStatus.ok) 178 m_sockets.receiveNoGC(s, m_buf, IOMode.once, &onSocketData); 179 180 try { 181 EventID evt = m_sockets.userData!EventID(s); 182 scope doit = { 183 trigger(evt, (cast(long[])m_buf)[0] > 1); 184 }; // cast to nogc 185 () @trusted { (cast(void delegate() @nogc)doit)(); } (); 186 } catch (Exception e) assert(false, e.msg); 187 } 188 } 189 190 override bool isValid(EventID handle) 191 const { 192 if (handle.value >= m_loop.m_fds.length) return false; 193 return m_loop.m_fds[handle.value].common.validationCounter == handle.validationCounter; 194 } 195 196 final override void addRef(EventID descriptor) 197 { 198 if (!isValid(descriptor)) return; 199 200 assert(getRC(descriptor) > 0, "Adding reference to unreferenced event FD."); 201 getRC(descriptor)++; 202 } 203 204 final override bool releaseRef(EventID descriptor) 205 @nogc { 206 if (!isValid(descriptor)) return true; 207 208 nogc_assert(getRC(descriptor) > 0, "Releasing reference to unreferenced event FD."); 209 if (--getRC(descriptor) == 0) { 210 if (!isInternal(descriptor)) 211 m_loop.m_waiterCount -= getSlot(descriptor).waiters.length; 212 () @trusted nothrow { 213 try freeT(getSlot(descriptor).waiters); 214 catch (Exception e) nogc_assert(false, e.msg); 215 } (); 216 version (linux) { 217 m_loop.unregisterFD(descriptor, EventMask.read); 218 } else { 219 auto rs = getSlot(descriptor).recvSocket; 220 m_sockets.cancelReceive(rs); 221 m_sockets.releaseRef(rs); 222 } 223 m_loop.clearFD!EventSlot(descriptor); 224 version (Posix) close(cast(int)descriptor); 225 else () @trusted { closesocket(cast(SOCKET)descriptor); } (); 226 return false; 227 } 228 return true; 229 } 230 231 final protected override void* rawUserData(EventID descriptor, size_t size, DataInitializer initialize, DataInitializer destroy) 232 @system { 233 if (!isValid(descriptor)) return null; 234 return m_loop.rawUserDataImpl(descriptor, size, initialize, destroy); 235 } 236 237 private EventSlot* getSlot(EventID id) 238 @nogc { 239 nogc_assert(id < m_loop.m_fds.length, "Invalid event ID."); 240 return () @trusted @nogc { return &m_loop.m_fds[id].event(); } (); 241 } 242 243 private ref uint getRC(EventID id) 244 { 245 return m_loop.m_fds[id].common.refCount; 246 } 247 248 private bool isInternal(EventID id) 249 @nogc { 250 return getSlot(id).isInternal; 251 } 252 } 253 254 package struct EventSlot { 255 alias Handle = EventID; 256 ConsumableQueue!EventCallback waiters; 257 bool isInternal; 258 version (linux) {} 259 else { 260 DatagramSocketFD recvSocket; 261 } 262 }