1 /** 2 Base class for BSD socket based driver implementations. 3 4 See_also: `eventcore.drivers.select`, `eventcore.drivers.epoll`, `eventcore.drivers.kqueue` 5 */ 6 module eventcore.drivers.posix.driver; 7 @safe: /*@nogc:*/ nothrow: 8 9 public import eventcore.driver; 10 import eventcore.drivers.posix.dns; 11 import eventcore.drivers.posix.events; 12 import eventcore.drivers.posix.signals; 13 import eventcore.drivers.posix.sockets; 14 import eventcore.drivers.posix.watchers; 15 import eventcore.drivers.posix.processes; 16 import eventcore.drivers.posix.pipes; 17 import eventcore.drivers.timer; 18 import eventcore.drivers.threadedfile; 19 import eventcore.internal.consumablequeue : ConsumableQueue; 20 import eventcore.internal.utils; 21 22 import core.time : MonoTime; 23 import std.algorithm.comparison : among, min, max; 24 import std.format : format; 25 26 version (Posix) { 27 package alias sock_t = int; 28 } 29 version (Windows) { 30 package alias sock_t = size_t; 31 } 32 33 private long currStdTime() 34 { 35 import std.datetime : Clock; 36 scope (failure) assert(false); 37 return Clock.currStdTime; 38 } 39 40 final class PosixEventDriver(Loop : PosixEventLoop) : EventDriver { 41 @safe: /*@nogc:*/ nothrow: 42 43 44 private { 45 alias CoreDriver = PosixEventDriverCore!(Loop, TimerDriver, EventsDriver, ProcessDriver); 46 alias EventsDriver = PosixEventDriverEvents!(Loop, SocketsDriver); 47 version (linux) alias SignalsDriver = SignalFDEventDriverSignals!Loop; 48 else alias SignalsDriver = DummyEventDriverSignals!Loop; 49 alias TimerDriver = LoopTimeoutTimerDriver; 50 alias SocketsDriver = PosixEventDriverSockets!Loop; 51 version (Windows) alias DNSDriver = EventDriverDNS_GHBN!(EventsDriver, SignalsDriver); 52 else version (EventcoreUseGAIA) alias DNSDriver = EventDriverDNS_GAIA!(EventsDriver, SignalsDriver); 53 else alias DNSDriver = EventDriverDNS_GAI!(EventsDriver, SignalsDriver); 54 alias FileDriver = ThreadedFileEventDriver!EventsDriver; 55 version (Posix) alias PipeDriver = PosixEventDriverPipes!Loop; 56 else alias PipeDriver = DummyEventDriverPipes!Loop; 57 version (linux) alias WatcherDriver = InotifyEventDriverWatchers!EventsDriver; 58 else version (EventcoreCFRunLoopDriver) alias WatcherDriver = FSEventsEventDriverWatchers!EventsDriver; 59 else alias WatcherDriver = PollEventDriverWatchers!EventsDriver; 60 version (Posix) alias ProcessDriver = PosixEventDriverProcesses!Loop; 61 else alias ProcessDriver = DummyEventDriverProcesses!Loop; 62 63 Loop m_loop; 64 CoreDriver m_core; 65 EventsDriver m_events; 66 SignalsDriver m_signals; 67 LoopTimeoutTimerDriver m_timers; 68 SocketsDriver m_sockets; 69 DNSDriver m_dns; 70 FileDriver m_files; 71 PipeDriver m_pipes; 72 WatcherDriver m_watchers; 73 ProcessDriver m_processes; 74 } 75 76 this() 77 @nogc @trusted { 78 m_loop = mallocT!Loop; 79 m_sockets = mallocT!SocketsDriver(m_loop); 80 m_events = mallocT!EventsDriver(m_loop, m_sockets); 81 m_signals = mallocT!SignalsDriver(m_loop); 82 m_timers = mallocT!TimerDriver; 83 m_pipes = mallocT!PipeDriver(m_loop); 84 m_processes = mallocT!ProcessDriver(m_loop, this); 85 m_core = mallocT!CoreDriver(m_loop, m_timers, m_events, m_processes); 86 m_dns = mallocT!DNSDriver(m_events, m_signals); 87 m_files = mallocT!FileDriver(m_events); 88 m_watchers = mallocT!WatcherDriver(m_events); 89 } 90 91 // force overriding these in the (final) sub classes to avoid virtual calls 92 final override @property inout(CoreDriver) core() inout { return m_core; } 93 final override @property shared(inout(CoreDriver)) core() shared inout { return m_core; } 94 final override @property inout(EventsDriver) events() inout { return m_events; } 95 final override @property shared(inout(EventsDriver)) events() shared inout { return m_events; } 96 final override @property inout(SignalsDriver) signals() inout { return m_signals; } 97 final override @property inout(TimerDriver) timers() inout { return m_timers; } 98 final override @property inout(SocketsDriver) sockets() inout { return m_sockets; } 99 final override @property inout(DNSDriver) dns() inout { return m_dns; } 100 final override @property inout(FileDriver) files() inout { return m_files; } 101 final override @property inout(PipeDriver) pipes() inout { return m_pipes; } 102 final override @property inout(WatcherDriver) watchers() inout { return m_watchers; } 103 final override @property inout(ProcessDriver) processes() inout { return m_processes; } 104 105 final override bool dispose() 106 { 107 import core.thread : Thread; 108 import taggedalgebraic : hasType; 109 110 if (!m_loop) return true; 111 112 static string getThreadName() 113 { 114 string thname; 115 try thname = Thread.getThis().name; 116 catch (Exception e) assert(false, e.msg); 117 return thname.length ? thname : "unknown"; 118 } 119 120 bool hasPrintedHeader; 121 foreach (id, ref s; m_loop.m_fds) { 122 if (!s.specific.hasType!(typeof(null)) && !(s.common.flags & FDFlags.internal) 123 && (!s.specific.hasType!(StreamSocketSlot) || s.streamSocket.state == ConnectionState.connected)) 124 { 125 if (!hasPrintedHeader) { 126 print("Warning (thread: %s): leaking eventcore driver because there are still active handles", getThreadName()); 127 hasPrintedHeader = true; 128 } 129 print(" FD %s (%s)", id, s.specific.kind); 130 debug (EventCoreLeakTrace) { 131 import std.array : replace; 132 string origin_str = s.common.origin.toString(); 133 print(" Created by;\n %s", 134 origin_str.replace("\n","\n ")); 135 } 136 } 137 } 138 debug (EventCoreLeakTrace) {} 139 else { 140 if (hasPrintedHeader) 141 print("Use '-debug=EventCoreLeakTrace' to show where the instantiation happened"); 142 } 143 144 if (m_loop.m_handleCount > 0) 145 return false; 146 147 m_processes.dispose(); 148 m_files.dispose(); 149 m_dns.dispose(); 150 m_core.dispose(); 151 m_loop.dispose(); 152 153 try () @trusted { 154 freeT(m_processes); 155 freeT(m_watchers); 156 freeT(m_pipes); 157 freeT(m_files); 158 freeT(m_dns); 159 freeT(m_core); 160 freeT(m_timers); 161 freeT(m_signals); 162 freeT(m_events); 163 freeT(m_sockets); 164 freeT(m_loop); 165 } (); 166 catch (Exception e) assert(false, e.msg); 167 168 return true; 169 } 170 } 171 172 173 final class PosixEventDriverCore(Loop : PosixEventLoop, Timers : EventDriverTimers, Events : EventDriverEvents, Processes : EventDriverProcesses) : EventDriverCore { 174 @safe nothrow: 175 import core.atomic : atomicLoad, atomicStore; 176 import core.sync.mutex : Mutex; 177 import core.time : Duration; 178 import std.stdint : intptr_t; 179 import std.typecons : Tuple, tuple; 180 181 protected alias ExtraEventsCallback = bool delegate(long); 182 183 private alias ThreadCallbackEntry = Tuple!(ThreadCallbackGen, ThreadCallbackGenParams); 184 185 private { 186 Loop m_loop; 187 Timers m_timers; 188 Events m_events; 189 Processes m_processes; 190 bool m_exit = false; 191 EventID m_wakeupEvent; 192 193 shared Mutex m_threadCallbackMutex; 194 ConsumableQueue!ThreadCallbackEntry m_threadCallbacks; 195 } 196 197 this(Loop loop, Timers timers, Events events, Processes processes) 198 @nogc { 199 m_loop = loop; 200 m_timers = timers; 201 m_events = events; 202 m_processes = processes; 203 m_wakeupEvent = events.createInternal(); 204 m_threadCallbackMutex = mallocT!(shared(Mutex)); 205 m_threadCallbacks = mallocT!(ConsumableQueue!ThreadCallbackEntry); 206 m_threadCallbacks.reserve(1000); 207 } 208 209 final void dispose() 210 { 211 executeThreadCallbacks(); 212 m_events.releaseRef(m_wakeupEvent); 213 m_wakeupEvent = EventID.invalid; // FIXME: this needs to be synchronized! 214 try { 215 () @trusted { freeT(m_threadCallbackMutex); } (); 216 () @trusted { freeT(m_threadCallbacks); } (); 217 } catch (Exception e) assert(false, e.msg); 218 } 219 220 @property size_t waiterCount() const { return m_loop.m_waiterCount + m_timers.pendingCount + m_processes.pendingCount; } 221 222 final override ExitReason processEvents(Duration timeout) 223 { 224 import core.time : hnsecs, seconds; 225 226 executeThreadCallbacks(); 227 228 if (m_exit) { 229 m_exit = false; 230 return ExitReason.exited; 231 } 232 233 if (!waiterCount) { 234 return ExitReason.outOfWaiters; 235 } 236 237 bool got_events; 238 239 if (timeout <= 0.seconds) { 240 got_events = m_loop.doProcessEvents(0.seconds); 241 m_timers.process(MonoTime.currTime); 242 } else { 243 auto now = MonoTime.currTime; 244 do { 245 auto nextto = max(min(m_timers.getNextTimeout(now), timeout), 0.seconds); 246 got_events = m_loop.doProcessEvents(nextto); 247 auto prev_step = now; 248 now = MonoTime.currTime; 249 got_events |= m_timers.process(now); 250 if (timeout != Duration.max) 251 timeout -= now - prev_step; 252 } while (timeout > 0.seconds && !m_exit && !got_events); 253 } 254 255 executeThreadCallbacks(); 256 257 if (m_exit) { 258 m_exit = false; 259 return ExitReason.exited; 260 } 261 if (!waiterCount) { 262 return ExitReason.outOfWaiters; 263 } 264 if (got_events) return ExitReason.idle; 265 return ExitReason.timeout; 266 } 267 268 final override void exit() 269 { 270 m_exit = true; // FIXME: this needs to be synchronized! 271 () @trusted { (cast(shared)m_events).trigger(m_wakeupEvent, true); } (); 272 } 273 274 final override void clearExitFlag() 275 { 276 m_exit = false; 277 } 278 279 final override void runInOwnerThread(ThreadCallbackGen del, 280 ref ThreadCallbackGenParams params) 281 shared { 282 auto m = atomicLoad(m_threadCallbackMutex); 283 auto evt = atomicLoad(m_wakeupEvent); 284 // NOTE: This case must be handled gracefully to avoid hazardous 285 // race-conditions upon unexpected thread termination. The mutex 286 // and the map will stay valid even after the driver has been 287 // disposed, so no further synchronization is required. 288 if (!m) return; 289 290 try { 291 synchronized (m) 292 () @trusted { return (cast()this).m_threadCallbacks; } () 293 .put(ThreadCallbackEntry(del, params)); 294 } catch (Exception e) assert(false, e.msg); 295 296 m_events.trigger(m_wakeupEvent, false); 297 } 298 299 alias runInOwnerThread = EventDriverCore.runInOwnerThread; 300 301 302 final protected override void* rawUserData(StreamSocketFD descriptor, size_t size, DataInitializer initialize, DataInitializer destroy) 303 @system { 304 return rawUserDataImpl(descriptor, size, initialize, destroy); 305 } 306 307 final protected override void* rawUserData(DatagramSocketFD descriptor, size_t size, DataInitializer initialize, DataInitializer destroy) 308 @system { 309 return rawUserDataImpl(descriptor, size, initialize, destroy); 310 } 311 312 protected final void* rawUserDataImpl(FD descriptor, size_t size, DataInitializer initialize, DataInitializer destroy) 313 @system { 314 return m_loop.rawUserDataImpl(descriptor, size, initialize, destroy); 315 } 316 317 private void executeThreadCallbacks() 318 { 319 import std.stdint : intptr_t; 320 321 while (true) { 322 ThreadCallbackEntry del; 323 try { 324 synchronized (m_threadCallbackMutex) { 325 if (m_threadCallbacks.empty) break; 326 del = m_threadCallbacks.consumeOne; 327 } 328 } catch (Exception e) assert(false, e.msg); 329 del[0](del[1]); 330 } 331 } 332 } 333 334 335 package class PosixEventLoop { 336 @safe: nothrow: 337 import core.time : Duration; 338 339 package { 340 AlgebraicChoppedVector!(FDSlot, StreamSocketSlot, StreamListenSocketSlot, DgramSocketSlot, DNSSlot, WatcherSlot, EventSlot, SignalSlot, PipeSlot) m_fds; 341 size_t m_handleCount = 0; 342 size_t m_waiterCount = 0; 343 } 344 345 protected @property int maxFD() const { return cast(int)m_fds.length; } 346 347 protected void dispose() @nogc { destroy(m_fds); } 348 349 /** Waits for and processes a single batch of events. 350 351 Returns: 352 Returns `false` if no event was received before the timeout expired 353 and `true` if either an event was received, or if the wait was 354 interrupted by an error or signal. 355 */ 356 protected abstract bool doProcessEvents(Duration dur); 357 358 /// Registers the FD for general notification reception. 359 protected abstract void registerFD(FD fd, EventMask mask, bool edge_triggered = true) @nogc; 360 /// Unregisters the FD for general notification reception. 361 protected abstract void unregisterFD(FD fd, EventMask mask) @nogc; 362 /// Updates the event mask to use for listening for notifications. 363 protected abstract void updateFD(FD fd, EventMask old_mask, EventMask new_mask, bool edge_triggered = true) @nogc; 364 365 final protected void notify(EventType evt)(size_t fd) 366 { 367 //assert(m_fds[fd].callback[evt] !is null, "Notifying FD which is not listening for event."); 368 if (m_fds[fd].common.callback[evt]) { 369 auto vc = m_fds[fd].common.validationCounter; 370 m_fds[fd].common.callback[evt](FD(fd, vc)); 371 } 372 } 373 374 final protected void enumerateFDs(EventType evt)(scope FDEnumerateCallback del) 375 { 376 // TODO: optimize! 377 foreach (i; 0 .. cast(int)m_fds.length) 378 if (m_fds[i].common.callback[evt]) 379 del(FD(i, m_fds[i].common.validationCounter)); 380 } 381 382 package void setNotifyCallback(EventType evt)(FD fd, FDSlotCallback callback) 383 { 384 assert(m_fds[fd.value].common.refCount > 0, 385 "Setting notification callback on unreferenced file descriptor slot."); 386 assert((callback !is null) != (m_fds[fd.value].common.callback[evt] !is null), 387 "Overwriting notification callback."); 388 // ensure that the FD doesn't get closed before the callback gets called. 389 with (m_fds[fd.value]) { 390 if (callback !is null) { 391 if (!(common.flags & FDFlags.internal)) m_waiterCount++; 392 common.refCount++; 393 } else { 394 common.refCount--; 395 if (!(common.flags & FDFlags.internal)) m_waiterCount--; 396 } 397 common.callback[evt] = callback; 398 } 399 } 400 401 package FDType initFD(FDType, T)(size_t fd, FDFlags flags, auto ref T slot_init) 402 { 403 uint vc; 404 405 with (m_fds[fd]) { 406 assert(common.refCount == 0, "Initializing referenced file descriptor slot."); 407 assert(specific.kind == typeof(specific).Kind.none, "Initializing slot that has not been cleared."); 408 common.refCount = 1; 409 common.flags = flags; 410 debug (EventCoreLeakTrace) 411 { 412 import core.runtime : defaultTraceHandler; 413 common.origin = defaultTraceHandler(null); 414 } 415 specific = slot_init; 416 vc = common.validationCounter; 417 } 418 419 if (!(flags & FDFlags.internal)) 420 m_handleCount++; 421 422 return FDType(fd, vc); 423 } 424 425 package void clearFD(T)(FD fd) 426 { 427 import taggedalgebraic : hasType; 428 429 auto slot = () @trusted { return &m_fds[fd.value]; } (); 430 assert(slot.common.validationCounter == fd.validationCounter, "Clearing FD slot for invalid FD"); 431 assert(slot.specific.hasType!T, "Clearing file descriptor slot with unmatched type."); 432 433 if (!(slot.common.flags & FDFlags.internal)) 434 m_handleCount--; 435 436 if (slot.common.userDataDestructor) 437 () @trusted { slot.common.userDataDestructor(slot.common.userData.ptr); } (); 438 if (!(slot.common.flags & FDFlags.internal)) 439 foreach (cb; slot.common.callback) 440 if (cb !is null) 441 m_waiterCount--; 442 443 auto vc = slot.common.validationCounter; 444 *slot = m_fds.FullField.init; 445 slot.common.validationCounter = vc + 1; 446 } 447 448 package final void* rawUserDataImpl(size_t descriptor, size_t size, DataInitializer initialize, DataInitializer destroy) 449 @system @nogc { 450 FDSlot* fds = &m_fds[descriptor].common; 451 assert(fds.userDataDestructor is null || fds.userDataDestructor is destroy, 452 "Requesting user data with differing type (destructor)."); 453 assert(size <= FDSlot.userData.length, "Requested user data is too large."); 454 if (size > FDSlot.userData.length) assert(false); 455 if (!fds.userDataDestructor) { 456 initialize(fds.userData.ptr); 457 fds.userDataDestructor = destroy; 458 } 459 return fds.userData.ptr; 460 } 461 } 462 463 464 alias FDEnumerateCallback = void delegate(FD); 465 466 alias FDSlotCallback = void delegate(FD); 467 468 private struct FDSlot { 469 FDSlotCallback[EventType.max+1] callback; 470 uint refCount; 471 uint validationCounter; 472 FDFlags flags; 473 474 DataInitializer userDataDestructor; 475 ubyte[16*size_t.sizeof] userData; 476 debug (EventCoreLeakTrace) 477 Throwable.TraceInfo origin; 478 479 @property EventMask eventMask() const nothrow { 480 EventMask ret = cast(EventMask)0; 481 if (callback[EventType.read] !is null) ret |= EventMask.read; 482 if (callback[EventType.write] !is null) ret |= EventMask.write; 483 if (callback[EventType.status] !is null) ret |= EventMask.status; 484 return ret; 485 } 486 } 487 488 enum FDFlags { 489 none = 0, 490 internal = 1<<0, 491 } 492 493 enum EventType { 494 read, 495 write, 496 status 497 } 498 499 enum EventMask { 500 read = 1<<0, 501 write = 1<<1, 502 status = 1<<2 503 } 504 505 void log(ARGS...)(string fmt, ARGS args) 506 @trusted { 507 import std.stdio : writef, writefln; 508 import core.thread : Thread; 509 try { 510 writef("[%s]: ", Thread.getThis().name); 511 writefln(fmt, args); 512 } catch (Exception) {} 513 } 514 515 516 /*version (Windows) { 517 import std.c.windows.windows; 518 import std.c.windows.winsock; 519 520 alias EWOULDBLOCK = WSAEWOULDBLOCK; 521 522 extern(System) DWORD FormatMessageW(DWORD dwFlags, const(void)* lpSource, DWORD dwMessageId, DWORD dwLanguageId, LPWSTR lpBuffer, DWORD nSize, void* Arguments); 523 524 class WSAErrorException : Exception { 525 int error; 526 527 this(string message, string file = __FILE__, size_t line = __LINE__) 528 { 529 error = WSAGetLastError(); 530 this(message, error, file, line); 531 } 532 533 this(string message, int error, string file = __FILE__, size_t line = __LINE__) 534 { 535 import std.string : format; 536 ushort* errmsg; 537 FormatMessageW(FORMAT_MESSAGE_ALLOCATE_BUFFER|FORMAT_MESSAGE_FROM_SYSTEM|FORMAT_MESSAGE_IGNORE_INSERTS, 538 null, error, MAKELANGID(LANG_NEUTRAL, SUBLANG_DEFAULT), cast(LPWSTR)&errmsg, 0, null); 539 size_t len = 0; 540 while (errmsg[len]) len++; 541 auto errmsgd = (cast(wchar[])errmsg[0 .. len]).idup; 542 LocalFree(errmsg); 543 super(format("%s: %s (%s)", message, errmsgd, error), file, line); 544 } 545 } 546 547 alias SystemSocketException = WSAErrorException; 548 } else { 549 import std.exception : ErrnoException; 550 alias SystemSocketException = ErrnoException; 551 } 552 553 T socketEnforce(T)(T value, lazy string msg = null, string file = __FILE__, size_t line = __LINE__) 554 { 555 import std.exception : enforceEx; 556 return enforceEx!SystemSocketException(value, msg, file, line); 557 }*/