1 /** Definition of the core event driver interface. 2 3 This module contains all declarations necessary for defining and using 4 event drivers. Event driver implementations will usually inherit from 5 `EventDriver` using a `final` class to avoid virtual function overhead. 6 7 Callback_Behavior: 8 All callbacks follow the same rules to enable generic implementation 9 of high-level libraries, such as vibe.d. Except for "listen" style 10 callbacks, each callback will only ever be called at most once. 11 12 If the operation does not get canceled, the callback will be called 13 exactly once. In case it gets manually canceled using the corresponding 14 API function, the callback is guaranteed to not be called. However, 15 the associated operation might still finish - either before the 16 cancellation function returns, or afterwards. 17 */ 18 module eventcore.driver; 19 20 import core.time : Duration; 21 import std.process : StdProcessConfig = Config; 22 import std.socket : Address; 23 import std.stdint : intptr_t; 24 import std.typecons : Tuple; 25 import std.variant : Algebraic; 26 27 28 /** Encapsulates a full event driver. 29 30 This interface provides access to the individual driver features, as well as 31 a central `dispose` method that must be called before the driver gets 32 destroyed or before the process gets terminated. 33 */ 34 interface EventDriver { 35 @safe: /*@nogc:*/ nothrow: 36 /// Core event loop functionality 37 @property inout(EventDriverCore) core() inout; 38 /// Core event loop functionality 39 @property shared(inout(EventDriverCore)) core() shared inout; 40 /// Single shot and recurring timers 41 @property inout(EventDriverTimers) timers() inout; 42 /// Cross-thread events (thread local access) 43 @property inout(EventDriverEvents) events() inout; 44 /// Cross-thread events (cross-thread access) 45 @property shared(inout(EventDriverEvents)) events() shared inout; 46 /// UNIX/POSIX signal reception 47 @property inout(EventDriverSignals) signals() inout; 48 /// Stream and datagram sockets 49 @property inout(EventDriverSockets) sockets() inout; 50 /// DNS queries 51 @property inout(EventDriverDNS) dns() inout; 52 /// Local file operations 53 @property inout(EventDriverFiles) files() inout; 54 /// Directory change watching 55 @property inout(EventDriverWatchers) watchers() inout; 56 /// Sub-process handling 57 @property inout(EventDriverProcesses) processes() inout; 58 /// Pipes 59 @property inout(EventDriverPipes) pipes() inout; 60 61 /** Releases all resources associated with the driver. 62 63 In case of any left-over referenced handles, this function returns 64 `false` and does not free any resources. It may choose to free the 65 resources once the last handle gets dereferenced. 66 */ 67 bool dispose(); 68 } 69 70 71 /** Provides generic event loop control. 72 */ 73 interface EventDriverCore { 74 /** The number of pending callbacks. 75 76 When this number drops to zero, the event loop can safely be quit. It is 77 guaranteed that no callbacks will be made anymore, unless new callbacks 78 get registered. 79 */ 80 size_t waiterCount() @safe nothrow; 81 82 /** Runs the event loop to process a chunk of events. 83 84 This method optionally waits for an event to arrive if none are present 85 in the event queue. The function will return after either the specified 86 timeout has elapsed, or once the event queue has been fully emptied. 87 88 On implementations that support it, the function will treat 89 interruptions by POSIX signals as if an event was received and will 90 cause it to return. However, note that it is generally recommended to 91 use `EventDriverSignals` instead of raw signal handlers in order to 92 avoid their pitfalls as far as possible. 93 94 Params: 95 timeout = Maximum amount of time to wait for an event. A duration of 96 zero will cause the function to only process pending events. A 97 duration of `Duration.max`, if necessary, will wait indefinitely 98 until an event arrives. 99 */ 100 ExitReason processEvents(Duration timeout) @safe nothrow; 101 102 /** Causes `processEvents` to return with `ExitReason.exited` as soon as 103 possible. 104 105 A call to `processEvents` that is currently in progress will be notified 106 so that it returns immediately. If no call is in progress, the next call 107 to `processEvents` will immediately return with `ExitReason.exited`. 108 */ 109 void exit() @safe nothrow; 110 111 /** Resets the exit flag. 112 113 `processEvents` will automatically reset the exit flag before it returns 114 with `ExitReason.exited`. However, if `exit` is called outside of 115 `processEvents`, the next call to `processEvents` will return with 116 `ExitCode.exited` immediately. This function can be used to avoid this. 117 */ 118 void clearExitFlag() @safe nothrow; 119 120 /** Executes a callback in the thread owning the driver. 121 */ 122 void runInOwnerThread(ThreadCallbackGen fun, ref ThreadCallbackGenParams params) shared @safe nothrow; 123 /// ditto 124 final void runInOwnerThread(ARGS...)(void function(ARGS) @safe nothrow fun, ARGS args) shared 125 if (ARGS.length != 1 || !is(ARGS[0] == ThreadCallbackGenParams)) 126 { 127 alias F = void function(ARGS) @safe nothrow; 128 alias T = Tuple!ARGS; 129 static assert(F.sizeof + T.sizeof <= ThreadCallbackGenParams.length, 130 "Parameters take up too much space."); 131 132 ThreadCallbackGenParams params; 133 () @trusted { (cast(F[])params[0 .. F.sizeof])[0] = fun; } (); 134 (cast(T[])params[F.sizeof .. F.sizeof + T.sizeof])[0] = T(args); 135 runInOwnerThread((ref ThreadCallbackGenParams p) { 136 auto f = () @trusted { return cast(F[])p[0 .. F.sizeof]; } ()[0]; 137 auto pt = () @trusted { return cast(T[])p[F.sizeof .. F.sizeof + T.sizeof]; } (); 138 f(pt[0].expand); 139 }, params); 140 } 141 142 /// Low-level user data access. Use `getUserData` instead. 143 protected void* rawUserData(StreamSocketFD descriptor, size_t size, DataInitializer initialize, DataInitializer destroy) @system; 144 /// ditto 145 protected void* rawUserData(DatagramSocketFD descriptor, size_t size, DataInitializer initialize, DataInitializer destroy) @system; 146 147 /** Deprecated - use `EventDriverSockets.userData` instead. 148 */ 149 deprecated("Use `EventDriverSockets.userData` instead.") 150 @property final ref T userData(T, FD)(FD descriptor) 151 @trusted nothrow { 152 import std.conv : emplace; 153 static void init(void* ptr) { emplace(cast(T*)ptr); } 154 static void destr(void* ptr) { destroy(*cast(T*)ptr); } 155 return *cast(T*)rawUserData(descriptor, T.sizeof, &init, &destr); 156 } 157 } 158 159 160 /** Provides access to socket functionality. 161 162 The interface supports two classes of sockets - stream sockets and datagram 163 sockets. 164 */ 165 interface EventDriverSockets { 166 @safe: /*@nogc:*/ nothrow: 167 /** Connects to a stream listening socket. 168 */ 169 StreamSocketFD connectStream(scope Address peer_address, scope Address bind_address, ConnectCallback on_connect); 170 171 /** Aborts asynchronous connect by closing the socket. 172 173 This function may only invoked if the connection state is 174 `ConnectionState.connecting`. It will cancel the connection attempt and 175 guarantees that the connection callback will not be invoked in the 176 future. 177 178 Note that upon completion, the socket handle will be invalid, regardless 179 of the number of calls to `addRef`, and must not be used for further 180 operations. 181 182 Params: 183 sock = Handle of the socket that is currently establishing a 184 connection 185 */ 186 void cancelConnectStream(StreamSocketFD sock); 187 188 /** Adopts an existing stream socket. 189 190 The given socket must be in a connected state. It will be automatically 191 switched to non-blocking mode if necessary. Beware that this may have 192 side effects in other code that uses the socket and assumes blocking 193 operations. 194 195 Params: 196 socket = Socket file descriptor to adopt 197 198 Returns: 199 Returns a socket handle corresponding to the passed socket 200 descriptor. If the same file descriptor is already registered, 201 `StreamSocketFD.invalid` will be returned instead. 202 */ 203 StreamSocketFD adoptStream(int socket); 204 205 /// Creates a socket listening for incoming stream connections. 206 StreamListenSocketFD listenStream(scope Address bind_address, StreamListenOptions options, AcceptCallback on_accept); 207 208 final StreamListenSocketFD listenStream(scope Address bind_address, AcceptCallback on_accept) { 209 return listenStream(bind_address, StreamListenOptions.defaults, on_accept); 210 } 211 212 /// Starts to wait for incoming connections on a listening socket. 213 void waitForConnections(StreamListenSocketFD sock, AcceptCallback on_accept); 214 215 /// Determines the current connection state. 216 ConnectionState getConnectionState(StreamSocketFD sock); 217 218 /** Retrieves the bind address of a socket. 219 220 Example: 221 The following code can be used to retrieve an IPv4/IPv6 address 222 allocated on the stack. Note that Unix domain sockets require a larger 223 buffer (e.g. `sockaddr_storage`). 224 --- 225 scope storage = new UnknownAddress; 226 scope sockaddr = new RefAddress(storage.name, storage.nameLen); 227 eventDriver.sockets.getLocalAddress(sock, sockaddr); 228 --- 229 */ 230 bool getLocalAddress(SocketFD sock, scope RefAddress dst); 231 232 /** Retrieves the address of the connected peer. 233 234 Example: 235 The following code can be used to retrieve an IPv4/IPv6 address 236 allocated on the stack. Note that Unix domain sockets require a larger 237 buffer (e.g. `sockaddr_storage`). 238 --- 239 scope storage = new UnknownAddress; 240 scope sockaddr = new RefAddress(storage.name, storage.nameLen); 241 eventDriver.sockets.getLocalAddress(sock, sockaddr); 242 --- 243 */ 244 bool getRemoteAddress(SocketFD sock, scope RefAddress dst); 245 246 /// Sets the `TCP_NODELAY` option on a socket 247 void setTCPNoDelay(StreamSocketFD socket, bool enable); 248 249 /// Sets to `SO_KEEPALIVE` socket option on a socket. 250 void setKeepAlive(StreamSocketFD socket, bool enable); 251 252 /** Enables keepalive for the TCP socket and sets additional parameters. 253 Silently ignores unsupported systems (anything but Windows and Linux). 254 255 Params: 256 socket = Socket file descriptor to set options on. 257 idle = The time the connection needs to remain idle 258 before TCP starts sending keepalive probes. 259 interval = The time between individual keepalive probes. 260 probeCount = The maximum number of keepalive probes TCP should send 261 before dropping the connection. Has no effect on Windows. 262 */ 263 void setKeepAliveParams(StreamSocketFD socket, Duration idle, Duration interval, int probeCount = 5); 264 265 /// Sets `TCP_USER_TIMEOUT` socket option (linux only). https://tools.ietf.org/html/rfc5482 266 void setUserTimeout(StreamSocketFD socket, Duration timeout); 267 268 /** Reads data from a stream socket. 269 270 Note that only a single read operation is allowed at once. The caller 271 needs to make sure that either `on_read_finish` got called, or 272 `cancelRead` was called before issuing the next call to `read`. 273 274 Waiting_for_data_availability: 275 With the special combination of a zero-length buffer and `mode` 276 set to either `IOMode.once` or `IOMode.all`, this function will 277 wait until data is available on the socket without reading 278 anything. 279 280 Note that in this case the `IOStatus` parameter of the callback 281 will not reliably reflect a passive connection close. It is 282 necessary to actually read some data to make sure this case 283 is detected. 284 */ 285 void read(StreamSocketFD socket, ubyte[] buffer, IOMode mode, IOCallback on_read_finish); 286 287 /** Cancels an ongoing read operation. 288 289 After this function has been called, the `IOCallback` specified in 290 the call to `read` is guaranteed to not be called. 291 */ 292 void cancelRead(StreamSocketFD socket); 293 294 /** Reads data from a stream socket. 295 296 Note that only a single write operation is allowed at once. The caller 297 needs to make sure that either `on_write_finish` got called, or 298 `cancelWrite` was called before issuing the next call to `write`. 299 */ 300 void write(StreamSocketFD socket, const(ubyte)[] buffer, IOMode mode, IOCallback on_write_finish); 301 302 /** Cancels an ongoing write operation. 303 304 After this function has been called, the `IOCallback` specified in 305 the call to `write` is guaranteed to not be called. 306 */ 307 void cancelWrite(StreamSocketFD socket); 308 309 /** Waits for incoming data without actually reading it. 310 */ 311 void waitForData(StreamSocketFD socket, IOCallback on_data_available); 312 313 /** Initiates a connection close. 314 */ 315 void shutdown(StreamSocketFD socket, bool shut_read, bool shut_write); 316 317 /** Creates a connection-less datagram socket. 318 319 Params: 320 bind_address = The local bind address to use for the socket. It 321 will be able to receive any messages sent to this address. 322 target_address = Optional default target address. If this is 323 specified and the target address parameter of `send` is 324 left to `null`, it will be used instead. 325 options = Optional options for datagram creation. If unset, 326 `DatagramCreateOptions.init` is used. 327 328 Returns: 329 Returns a datagram socket handle if the socket was created 330 successfully. Otherwise returns `DatagramSocketFD.invalid`. 331 */ 332 DatagramSocketFD createDatagramSocket(scope Address bind_address, 333 scope Address target_address, 334 DatagramCreateOptions options = DatagramCreateOptions.init); 335 336 /** Adopts an existing datagram socket. 337 338 The socket must be properly bound before this function is called. 339 340 Params: 341 socket = Socket file descriptor to adopt 342 343 Returns: 344 Returns a socket handle corresponding to the passed socket 345 descriptor. If the same file descriptor is already registered, 346 `DatagramSocketFD.invalid` will be returned instead. 347 */ 348 DatagramSocketFD adoptDatagramSocket(int socket); 349 350 /** Sets an address to use as the default target address for sent datagrams. 351 */ 352 void setTargetAddress(DatagramSocketFD socket, scope Address target_address); 353 354 /// Sets the `SO_BROADCAST` socket option. 355 bool setBroadcast(DatagramSocketFD socket, bool enable); 356 357 /// Joins the multicast group associated with the given IP address. 358 bool joinMulticastGroup(DatagramSocketFD socket, scope Address multicast_address, uint interface_index = 0); 359 360 /// Receives a single datagram. 361 void receive(DatagramSocketFD socket, ubyte[] buffer, IOMode mode, DatagramIOCallback on_receive_finish); 362 /// Cancels an ongoing wait for an incoming datagram. 363 void cancelReceive(DatagramSocketFD socket); 364 /// Sends a single datagram. 365 void send(DatagramSocketFD socket, const(ubyte)[] buffer, IOMode mode, Address target_address, DatagramIOCallback on_send_finish); 366 /// Cancels an ongoing wait for an outgoing datagram. 367 void cancelSend(DatagramSocketFD socket); 368 369 /** Determines whether the given socket handle is valid. 370 371 A handle that is invalid will result in no operations being carried out 372 when used. In particular `addRef`/`releaseRef` will have no effect, but 373 can safely be called and I/O operations will result in 374 `IOStatus.invalidHandle`. 375 376 A valid handle gets invalid when either the reference count drops to 377 zero, or after the socket was explicitly closed. 378 */ 379 bool isValid(SocketFD handle) const @nogc; 380 381 /** Increments the reference count of the given socket. 382 */ 383 void addRef(SocketFD descriptor); 384 385 /** Decrements the reference count of the given socket. 386 387 Once the reference count reaches zero, all associated resources will be 388 freed and the resource descriptor gets invalidated. 389 390 Returns: 391 Returns `false` $(I iff) the last reference was removed by this call. 392 393 Passing an invalid handle will result in a return value of `true`. 394 */ 395 bool releaseRef(SocketFD descriptor); 396 397 /** Enables or disables a socket option. 398 */ 399 bool setOption(DatagramSocketFD socket, DatagramSocketOption option, bool enable); 400 /// ditto 401 bool setOption(StreamSocketFD socket, StreamSocketOption option, bool enable); 402 403 /** Retrieves a reference to a user-defined value associated with a descriptor. 404 */ 405 @property final ref T userData(T, FD)(FD descriptor) @trusted @nogc 406 if (hasNoGCLifetime!T) 407 { 408 import std.conv : emplace; 409 static void init(void* ptr) @nogc { emplace(cast(T*)ptr); } 410 static void destr(void* ptr) @nogc { destroy(*cast(T*)ptr); } 411 return *cast(T*)rawUserData(descriptor, T.sizeof, &init, &destr); 412 } 413 /// ditto 414 deprecated("Only @nogc constructible and destructible user data allowed.") 415 @property final ref T userData(T, FD)(FD descriptor) @trusted 416 if (!hasNoGCLifetime!T) 417 { 418 import std.conv : emplace; 419 static void init(void* ptr) { emplace(cast(T*)ptr); } 420 static void destr(void* ptr) { destroy(*cast(T*)ptr); } 421 static if (__traits(compiles, () nothrow { init(null); destr(null); })) 422 alias F = void function(void*) @nogc nothrow; 423 else alias F = void function(void*) @nogc; 424 return *cast(T*)rawUserData(descriptor, T.sizeof, cast(F)&init, cast(F)&destr); 425 } 426 427 /// Low-level user data access. Use `getUserData` instead. 428 protected void* rawUserData(StreamSocketFD descriptor, size_t size, DataInitializer initialize, DataInitializer destroy) @system @nogc; 429 /// ditto 430 protected void* rawUserData(StreamListenSocketFD descriptor, size_t size, DataInitializer initialize, DataInitializer destroy) @system @nogc; 431 /// ditto 432 protected void* rawUserData(DatagramSocketFD descriptor, size_t size, DataInitializer initialize, DataInitializer destroy) @system @nogc; 433 } 434 435 enum hasNoGCLifetime(T) = __traits(compiles, () @nogc @trusted { import std.conv : emplace; T b = void; emplace!T(&b); destroy(b); }); 436 unittest { 437 static struct S1 {} 438 static struct S2 { ~this() { new int; } } 439 static assert(hasNoGCLifetime!S1); 440 static assert(!hasNoGCLifetime!S2); 441 } 442 443 444 /** Performs asynchronous DNS queries. 445 */ 446 interface EventDriverDNS { 447 @safe: /*@nogc:*/ nothrow: 448 /// Looks up addresses corresponding to the given DNS name. 449 DNSLookupID lookupHost(string name, DNSLookupCallback on_lookup_finished); 450 451 /// Cancels an ongoing DNS lookup. 452 void cancelLookup(DNSLookupID handle); 453 454 /** Determines whether the given DNS lookup handle is valid. 455 456 A valid handle gets invalid when the lookup has completed or got 457 cancelled. 458 */ 459 bool isValid(DNSLookupID handle) const @nogc; 460 } 461 462 463 /** Provides read/write operations on the local file system. 464 */ 465 interface EventDriverFiles { 466 @safe: /*@nogc:*/ nothrow: 467 FileFD open(string path, FileOpenMode mode); 468 FileFD adopt(int system_file_handle); 469 470 /** Disallows any reads/writes and removes any exclusive locks. 471 472 Note that the file handle may become invalid at any point after the 473 call to `close`, regardless of its current reference count. Any 474 operations on the handle will not have an effect. 475 */ 476 void close(FileFD file, FileCloseCallback on_closed); 477 478 ulong getSize(FileFD file); 479 480 /** Shrinks or extends a file to the specified size. 481 482 Params: 483 file = Handle of the file to resize 484 size = Desired file size in bytes 485 on_finish = Called when the operation finishes - the `size` 486 parameter is always set to zero 487 */ 488 void truncate(FileFD file, ulong size, FileIOCallback on_finish); 489 490 /** Writes data to a file 491 492 Note that only a single read operation is allowed at once. The caller 493 needs to make sure that either `on_read_finish` got called, or 494 `cancelRead` was called before issuing the next call to `read`. 495 */ 496 void write(FileFD file, ulong offset, const(ubyte)[] buffer, IOMode mode, FileIOCallback on_write_finish); 497 498 /** Cancels an ongoing write operation. 499 500 After this function has been called, the `FileIOCallback` specified in 501 the call to `write` is guaranteed not to be called. 502 */ 503 void cancelWrite(FileFD file); 504 505 /** Reads data from a file. 506 507 Note that only a single read operation is allowed at once. The caller 508 needs to make sure that either `on_read_finish` got called, or 509 `cancelRead` was called before issuing the next call to `read`. 510 */ 511 void read(FileFD file, ulong offset, ubyte[] buffer, IOMode mode, FileIOCallback on_read_finish); 512 513 /** Cancels an ongoing read operation. 514 515 After this function has been called, the `FileIOCallback` specified in 516 the call to `read` is guaranteed not to be called. 517 */ 518 void cancelRead(FileFD file); 519 520 /** Determines whether the given file handle is valid. 521 522 A handle that is invalid will result in no operations being carried out 523 when used. In particular `addRef`/`releaseRef` will have no effect, but 524 can safely be called and I/O operations will result in 525 `IOStatus.invalidHandle`. 526 527 A valid handle gets invalid when either the reference count drops to 528 zero, or after the file was explicitly closed. 529 */ 530 bool isValid(FileFD handle) const @nogc; 531 532 /** Increments the reference count of the given file. 533 */ 534 void addRef(FileFD descriptor); 535 536 /** Decrements the reference count of the given file. 537 538 Once the reference count reaches zero, all associated resources will be 539 freed and the resource descriptor gets invalidated. 540 541 Returns: 542 Returns `false` $(I iff) the last reference was removed by this call. 543 544 Passing an invalid handle will result in a return value of `true`. 545 */ 546 bool releaseRef(FileFD descriptor); 547 548 /** Retrieves a reference to a user-defined value associated with a descriptor. 549 */ 550 @property final ref T userData(T)(FileFD descriptor) 551 @trusted { 552 import std.conv : emplace; 553 static void init(void* ptr) @nogc { emplace(cast(T*)ptr); } 554 static void destr(void* ptr) @nogc { destroy(*cast(T*)ptr); } 555 return *cast(T*)rawUserData(descriptor, T.sizeof, &init, &destr); 556 } 557 558 /// Low-level user data access. Use `userData` instead. 559 protected void* rawUserData(FileFD descriptor, size_t size, DataInitializer initialize, DataInitializer destroy) @system; 560 } 561 562 563 /** Cross-thread notifications 564 565 "Events" can be used to wake up the event loop of a foreign thread. This is 566 the basis for all kinds of thread synchronization primitives, such as 567 mutexes, condition variables, message queues etc. Such primitives, in case 568 of extended wait periods, should use events rather than traditional means 569 to block, such as busy loops or kernel based wait mechanisms to avoid 570 stalling the event loop. 571 */ 572 interface EventDriverEvents { 573 @safe: /*@nogc:*/ nothrow: 574 /// Creates a new cross-thread event. 575 EventID create(); 576 577 /// Triggers an event owned by the current thread. 578 void trigger(EventID event, bool notify_all); 579 580 /// Triggers an event possibly owned by a different thread. 581 void trigger(EventID event, bool notify_all) shared; 582 583 /** Waits until an event gets triggered. 584 585 Multiple concurrent waits are allowed. 586 */ 587 void wait(EventID event, EventCallback on_event); 588 589 /// Cancels an ongoing wait operation. 590 void cancelWait(EventID event, EventCallback on_event); 591 592 /** Determines whether the given event handle is valid. 593 594 A handle that is invalid will result in no operations being carried out 595 when used. In particular `addRef`/`releaseRef` will have no effect, but 596 can safely be called. 597 598 A valid handle gets invalid when the reference count drops to zero. 599 */ 600 bool isValid(EventID handle) const @nogc; 601 602 /** Increments the reference count of the given event. 603 */ 604 void addRef(EventID descriptor); 605 606 /** Decrements the reference count of the given event. 607 608 Once the reference count reaches zero, all associated resources will be 609 freed and the resource descriptor gets invalidated. 610 611 Returns: 612 Returns `false` $(I iff) the last reference was removed by this call. 613 614 Passing an invalid handle will result in a return value of `true`. 615 */ 616 bool releaseRef(EventID descriptor); 617 618 /** Retrieves a reference to a user-defined value associated with a descriptor. 619 */ 620 @property final ref T userData(T)(EventID descriptor) 621 @trusted { 622 import std.conv : emplace; 623 static void init(void* ptr) @nogc { emplace(cast(T*)ptr); } 624 static void destr(void* ptr) @nogc { destroy(*cast(T*)ptr); } 625 return *cast(T*)rawUserData(descriptor, T.sizeof, &init, &destr); 626 } 627 628 /// Low-level user data access. Use `userData` instead. 629 protected void* rawUserData(EventID descriptor, size_t size, DataInitializer initialize, DataInitializer destroy) @system; 630 } 631 632 633 /** Handling of POSIX signals. 634 */ 635 interface EventDriverSignals { 636 @safe: /*@nogc:*/ nothrow: 637 /** Starts listening for the specified POSIX signal. 638 639 Note that if a default signal handler exists for the signal, it will be 640 disabled by using this function. 641 642 Params: 643 sig = The number of the signal to listen for 644 on_signal = Callback that gets called whenever a matching signal 645 gets received 646 647 Returns: 648 Returns an identifier that identifies the resource associated with 649 the signal. Giving up ownership of this resource using `releaseRef` 650 will re-enable the default signal handler, if any was present. 651 652 For any error condition, `SignalListenID.invalid` will be returned 653 instead. 654 */ 655 SignalListenID listen(int sig, SignalCallback on_signal); 656 657 /** Determines whether the given signal handle is valid. 658 659 A handle that is invalid will result in no operations being carried out 660 when used. In particular `addRef`/`releaseRef` will have no effect, but 661 can safely be called. 662 663 A valid handle gets invalid when the reference count drops to zero. 664 */ 665 bool isValid(SignalListenID handle) const @nogc; 666 667 /** Increments the reference count of the given resource. 668 */ 669 void addRef(SignalListenID descriptor); 670 671 /** Decrements the reference count of the given resource. 672 673 Once the reference count reaches zero, all associated resources will be 674 freed and the resource descriptor gets invalidated. 675 676 Returns: 677 Returns `false` $(I iff) the last reference was removed by this call. 678 679 Passing an invalid handle will result in a return value of `true`. 680 */ 681 bool releaseRef(SignalListenID descriptor); 682 } 683 684 interface EventDriverTimers { 685 @safe: /*@nogc:*/ nothrow: 686 TimerID create(); 687 688 /** Run the timer. 689 690 Params: 691 timer = the id of the timer, created by `create` call. 692 timeout = a duration to the first firing of the timer 693 repeat = a duration between periodic timer firings - set to zero 694 to set a single-fire timer 695 */ 696 void set(TimerID timer, Duration timeout, Duration repeat); 697 698 void stop(TimerID timer); 699 700 bool isPending(TimerID timer); 701 bool isPeriodic(TimerID timer); 702 703 /** Waits for the timer to fire. 704 705 Important: the callback of the timer will be called exactly once, unless 706 `cancelWait` gets called first. `wait` needs to be called again to 707 receive future timer events (see https://github.com/vibe-d/eventcore/issues/172 708 for reasons behind that behavior). 709 710 Note that the `TimerCallback` based overload will not call the 711 callback if `stop` gets called before the timer fires, whereas the 712 `TimerCallback2` based overload will call the callback with the `fired` 713 parameter set to `false`. 714 */ 715 final void wait(TimerID timer, TimerCallback callback) { 716 wait(timer, (tm, fired) { 717 if (fired) callback(tm); 718 }); 719 } 720 /// ditto 721 void wait(TimerID timer, TimerCallback2 callback); 722 void cancelWait(TimerID timer); 723 724 /** Determines whether the given timer handle is valid. 725 726 A handle that is invalid will result in no operations being carried out 727 when used. In particular `addRef`/`releaseRef` will have no effect, but 728 can safely be called. 729 730 A valid handle gets invalid when the reference count drops to zero. 731 */ 732 bool isValid(TimerID handle) const @nogc; 733 734 /** Increments the reference count of the given resource. 735 */ 736 void addRef(TimerID descriptor); 737 738 /** Decrements the reference count of the given resource. 739 740 Once the reference count reaches zero, all associated resources will be 741 freed and the resource descriptor gets invalidated. 742 743 Returns: 744 Returns `false` $(I iff) the last reference was removed by this call. 745 746 Passing an invalid handle will result in a return value of `true`. 747 */ 748 bool releaseRef(TimerID descriptor); 749 750 /// Determines if the given timer's reference count equals one. 751 bool isUnique(TimerID descriptor) const; 752 753 /** Retrieves a reference to a user-defined value associated with a descriptor. 754 */ 755 @property final ref T userData(T)(TimerID descriptor) 756 @trusted { 757 import std.conv : emplace; 758 static void init(void* ptr) @nogc { emplace(cast(T*)ptr); } 759 static void destr(void* ptr) @nogc { destroy(*cast(T*)ptr); } 760 return *cast(T*)rawUserData(descriptor, T.sizeof, &init, &destr); 761 } 762 763 /// Low-level user data access. Use `userData` instead. 764 protected void* rawUserData(TimerID descriptor, size_t size, DataInitializer initialize, DataInitializer destroy) @system; 765 } 766 767 interface EventDriverWatchers { 768 @safe: /*@nogc:*/ nothrow: 769 /// Watches a directory or a directory sub tree for changes. 770 WatcherID watchDirectory(string path, bool recursive, FileChangesCallback callback); 771 772 /** Determines whether the given watcher handle is valid. 773 774 A handle that is invalid will result in no operations being carried out 775 when used. In particular `addRef`/`releaseRef` will have no effect, but 776 can safely be called. 777 778 A valid handle gets invalid when the reference count drops to zero. 779 */ 780 bool isValid(WatcherID handle) const @nogc; 781 782 /** Increments the reference count of the given resource. 783 */ 784 void addRef(WatcherID descriptor); 785 786 /** Decrements the reference count of the given resource. 787 788 Once the reference count reaches zero, all associated resources will be 789 freed and the resource descriptor gets invalidated. 790 791 Returns: 792 Returns `false` $(I iff) the last reference was removed by this call. 793 794 Passing an invalid handle will result in a return value of `true`. 795 */ 796 bool releaseRef(WatcherID descriptor); 797 798 /** Retrieves a reference to a user-defined value associated with a descriptor. 799 */ 800 @property final ref T userData(T)(WatcherID descriptor) 801 @trusted { 802 import std.conv : emplace; 803 static void init(void* ptr) @nogc { emplace(cast(T*)ptr); } 804 static void destr(void* ptr) @nogc { destroy(*cast(T*)ptr); } 805 return *cast(T*)rawUserData(descriptor, T.sizeof, &init, &destr); 806 } 807 808 /// Low-level user data access. Use `userData` instead. 809 protected void* rawUserData(WatcherID descriptor, size_t size, DataInitializer initialize, DataInitializer destroy) @system; 810 } 811 812 interface EventDriverProcesses { 813 @safe: /*@nogc:*/ nothrow: 814 /** Adopt an existing process. 815 */ 816 ProcessID adopt(int system_pid); 817 818 /** Spawn a child process. 819 820 Note that if a default signal handler exists for the signal, it will be 821 disabled by using this function. 822 823 Params: 824 args = The program arguments. First one must be an executable. 825 stdin = What should be done for stdin. Allows inheritance, piping, 826 nothing or any specific fd. If this results in a Pipe, 827 the PipeFD will be set in the stdin result. 828 stdout = See stdin, but also allows redirecting to stderr. 829 stderr = See stdin, but also allows redirecting to stdout. 830 env = The environment variables to spawn the process with. 831 config = Special process configurations. 832 working_dir = What to set the working dir in the process. 833 834 Returns: 835 Returns a Process struct containing the ProcessID and whatever 836 pipes have been adopted for stdin, stdout and stderr. 837 */ 838 Process spawn(string[] args, ProcessStdinFile stdin, ProcessStdoutFile stdout, ProcessStderrFile stderr, const string[string] env = null, ProcessConfig config = ProcessConfig.none, string working_dir = null); 839 840 /** Returns whether the process has exited yet. 841 */ 842 bool hasExited(ProcessID pid); 843 844 /** Kill the process using the given signal. Has different effects on different platforms. 845 */ 846 void kill(ProcessID pid, int signal); 847 848 /** Wait for the process to exit. 849 850 Returns an identifier that can be used to cancel the wait. 851 */ 852 size_t wait(ProcessID pid, ProcessWaitCallback on_process_exit); 853 854 /** Cancel a wait for the given identifier returned by wait. 855 */ 856 void cancelWait(ProcessID pid, size_t waitId); 857 858 /** Determines whether the given process handle is valid. 859 860 A handle that is invalid will result in no operations being carried out 861 when used. In particular `addRef`/`releaseRef` will have no effect, but 862 can safely be called. 863 864 A valid handle gets invalid when the reference count drops to zero. 865 */ 866 bool isValid(ProcessID handle) const @nogc; 867 868 /** Increments the reference count of the given resource. 869 */ 870 void addRef(ProcessID pid); 871 872 /** Decrements the reference count of the given resource. 873 874 Once the reference count reaches zero, all associated resources will be 875 freed and the resource descriptor gets invalidated. This will not kill 876 the sub-process, nor "detach" it. 877 878 Returns: 879 Returns `false` $(I iff) the last reference was removed by this call. 880 881 Passing an invalid handle will result in a return value of `true`. 882 */ 883 bool releaseRef(ProcessID pid); 884 885 /** Retrieves a reference to a user-defined value associated with a descriptor. 886 */ 887 @property final ref T userData(T)(ProcessID descriptor) 888 @trusted { 889 import std.conv : emplace; 890 static void init(void* ptr) @nogc { emplace(cast(T*)ptr); } 891 static void destr(void* ptr) @nogc { destroy(*cast(T*)ptr); } 892 return *cast(T*)rawUserData(descriptor, T.sizeof, &init, &destr); 893 } 894 895 /// Low-level user data access. Use `userData` instead. 896 protected void* rawUserData(ProcessID descriptor, size_t size, DataInitializer initialize, DataInitializer destroy) @system; 897 } 898 899 interface EventDriverPipes { 900 @safe: /*@nogc:*/ nothrow: 901 /** Adopt an existing pipe. This will modify the pipe to be non-blocking. 902 903 Note that pipes generally only allow either reads or writes but not 904 both, it is up to you to only call valid functions. 905 */ 906 PipeFD adopt(int system_pipe_handle); 907 908 /** Reads data from a stream socket. 909 910 Note that only a single read operation is allowed at once. The caller 911 needs to make sure that either `on_read_finish` got called, or 912 `cancelRead` was called before issuing the next call to `read`. 913 */ 914 void read(PipeFD pipe, ubyte[] buffer, IOMode mode, PipeIOCallback on_read_finish); 915 916 /** Cancels an ongoing read operation. 917 918 After this function has been called, the `PipeIOCallback` specified in 919 the call to `read` is guaranteed to not be called. 920 */ 921 void cancelRead(PipeFD pipe); 922 923 /** Writes data from a stream socket. 924 925 Note that only a single write operation is allowed at once. The caller 926 needs to make sure that either `on_write_finish` got called, or 927 `cancelWrite` was called before issuing the next call to `write`. 928 */ 929 void write(PipeFD pipe, const(ubyte)[] buffer, IOMode mode, PipeIOCallback on_write_finish); 930 931 /** Cancels an ongoing write operation. 932 933 After this function has been called, the `PipeIOCallback` specified in 934 the call to `write` is guaranteed to not be called. 935 */ 936 void cancelWrite(PipeFD pipe); 937 938 /** Waits for incoming data without actually reading it. 939 */ 940 void waitForData(PipeFD pipe, PipeIOCallback on_data_available); 941 942 /** Immediately close the pipe. Future read or write operations may fail. 943 944 Note that the file handle may become invalid at any point after the 945 call to `close`, regardless of its current reference count. Any 946 operations on the handle will not have an effect. 947 */ 948 void close(PipeFD file, PipeCloseCallback on_closed); 949 950 /** Determines whether the given pipe handle is valid. 951 952 A handle that is invalid will result in no operations being carried out 953 when used. In particular `addRef`/`releaseRef` will have no effect, but 954 can safely be called and I/O operations will result in 955 `IOStatus.invalidHandle`. 956 957 A valid handle gets invalid when either the reference count drops to 958 zero, or the pipe is explicitly closed. 959 */ 960 bool isValid(PipeFD handle) const @nogc; 961 962 /** Increments the reference count of the given resource. 963 */ 964 void addRef(PipeFD pid); 965 966 /** Decrements the reference count of the given resource. 967 968 Once the reference count reaches zero, all associated resources will be 969 freed and the resource descriptor gets invalidated. 970 971 Returns: 972 Returns `false` $(I iff) the last reference was removed by this call. 973 974 Passing an invalid handle will result in a return value of `true`. 975 */ 976 bool releaseRef(PipeFD pid); 977 978 /** Retrieves a reference to a user-defined value associated with a descriptor. 979 */ 980 @property final ref T userData(T)(PipeFD descriptor) 981 @trusted { 982 import std.conv : emplace; 983 static void init(void* ptr) @nogc { emplace(cast(T*)ptr); } 984 static void destr(void* ptr) @nogc { destroy(*cast(T*)ptr); } 985 return *cast(T*)rawUserData(descriptor, T.sizeof, &init, &destr); 986 } 987 988 /// Low-level user data access. Use `userData` instead. 989 protected void* rawUserData(PipeFD descriptor, size_t size, DataInitializer initialize, DataInitializer destroy) @system; 990 991 } 992 993 // Helper class to enable fully stack allocated `std.socket.Address` instances. 994 final class RefAddress : Address { 995 version (Posix) import core.sys.posix.sys.socket : sockaddr, socklen_t; 996 version (Windows) import core.sys.windows.winsock2 : sockaddr, socklen_t; 997 998 private { 999 sockaddr* m_addr; 1000 socklen_t m_addrLen; 1001 } 1002 1003 this() @safe nothrow {} 1004 this(sockaddr* addr, socklen_t addr_len) @safe nothrow { set(addr, addr_len); } 1005 1006 override @property sockaddr* name() scope { return m_addr; } 1007 override @property const(sockaddr)* name() const scope { return m_addr; } 1008 override @property socklen_t nameLen() const scope { return m_addrLen; } 1009 1010 void set(sockaddr* addr, socklen_t addr_len) @safe nothrow { m_addr = addr; m_addrLen = addr_len; } 1011 1012 void cap(socklen_t new_len) 1013 scope @safe nothrow { 1014 assert(new_len <= m_addrLen, "Cannot grow size of a RefAddress."); 1015 m_addrLen = new_len; 1016 } 1017 } 1018 1019 @safe: /*@nogc:*/ nothrow: 1020 1021 alias ConnectCallback = void delegate(StreamSocketFD, ConnectStatus); 1022 alias AcceptCallback = void delegate(StreamListenSocketFD, StreamSocketFD, scope RefAddress remote_address); 1023 alias IOCallback = void delegate(StreamSocketFD, IOStatus, size_t); 1024 alias DatagramIOCallback = void delegate(DatagramSocketFD, IOStatus, size_t, scope RefAddress); 1025 alias DNSLookupCallback = void delegate(DNSLookupID, DNSStatus, scope RefAddress[]); 1026 alias FileIOCallback = void delegate(FileFD, IOStatus, size_t); 1027 alias FileCloseCallback = void delegate(FileFD, CloseStatus); 1028 alias PipeIOCallback = void delegate(PipeFD, IOStatus, size_t); 1029 alias PipeCloseCallback = void delegate(PipeFD, CloseStatus); 1030 alias EventCallback = void delegate(EventID); 1031 alias SignalCallback = void delegate(SignalListenID, SignalStatus, int); 1032 alias TimerCallback = void delegate(TimerID); 1033 alias TimerCallback2 = void delegate(TimerID, bool fired); 1034 alias FileChangesCallback = void delegate(WatcherID, ref const FileChange change); 1035 1036 alias ProcessWaitCallback = void delegate(ProcessID, int); 1037 @system alias DataInitializer = void function(void*) @nogc; 1038 1039 enum ProcessRedirect { inherit, pipe, none } 1040 alias ProcessStdinFile = Algebraic!(int, ProcessRedirect); 1041 enum ProcessStdoutRedirect { toStderr } 1042 alias ProcessStdoutFile = Algebraic!(int, ProcessRedirect, ProcessStdoutRedirect); 1043 enum ProcessStderrRedirect { toStdout } 1044 alias ProcessStderrFile = Algebraic!(int, ProcessRedirect, ProcessStderrRedirect); 1045 1046 enum ExitReason { 1047 timeout, 1048 idle, 1049 outOfWaiters, 1050 exited 1051 } 1052 1053 enum CloseStatus { 1054 ok, 1055 ioError, 1056 invalidHandle, 1057 } 1058 1059 enum ConnectStatus { 1060 connected, 1061 refused, 1062 timeout, 1063 bindFailure, 1064 socketCreateFailure, 1065 unknownError 1066 } 1067 1068 enum ConnectionState { 1069 initialized, 1070 connecting, 1071 connected, 1072 passiveClose, 1073 activeClose, 1074 closed 1075 } 1076 1077 enum StreamListenOptions { 1078 none = 0, 1079 /// Applies the `SO_REUSEPORT` flag 1080 reusePort = 1<<0, 1081 /// Avoids applying the `SO_REUSEADDR` flag 1082 reuseAddress = 1<<1, 1083 /// 1084 defaults = reuseAddress, 1085 } 1086 1087 enum StreamSocketOption { 1088 noDelay, 1089 keepAlive 1090 } 1091 1092 enum DatagramCreateOptions { 1093 none = 0, 1094 /// Applies the `SO_REUSEPORT` flag 1095 reusePort = 1<<0, 1096 /// Avoids applying the `SO_REUSEADDR` flag 1097 reuseAddress = 1<<1, 1098 } 1099 1100 enum DatagramSocketOption { 1101 broadcast, 1102 multicastLoopback 1103 } 1104 1105 /** 1106 Specifies how a file is manipulated on disk. 1107 */ 1108 enum FileOpenMode { 1109 /// The file is opened read-only. 1110 read, 1111 /// The file is opened for read-write random access. 1112 readWrite, 1113 /// The file is truncated if it exists or created otherwise and then opened for read-write access. 1114 createTrunc, 1115 /// The file is opened for appending data to it and created if it does not exist. 1116 append 1117 } 1118 1119 enum IOMode { 1120 immediate, /// Process only as much as possible without waiting 1121 once, /// Process as much as possible with a single call 1122 all /// Process the full buffer 1123 } 1124 1125 enum IOStatus { 1126 ok, /// The data has been transferred normally 1127 disconnected, /// The connection was closed before all data could be transterred 1128 error, /// An error occured while transferring the data 1129 wouldBlock, /// Returned for `IOMode.immediate` when no data is readily readable/writable 1130 invalidHandle, /// The passed handle is not valid 1131 } 1132 1133 enum DNSStatus { 1134 ok, 1135 error 1136 } 1137 1138 /** Specifies the kind of change in a watched directory. 1139 */ 1140 enum FileChangeKind { 1141 /// A file or directory was added 1142 added, 1143 /// A file or directory was deleted 1144 removed, 1145 /// A file or directory was modified 1146 modified 1147 } 1148 1149 enum SignalStatus { 1150 ok, 1151 error 1152 } 1153 1154 /// See std.process.Config 1155 enum ProcessConfig { 1156 none = StdProcessConfig.none, 1157 detached = StdProcessConfig.detached, 1158 newEnv = StdProcessConfig.newEnv, 1159 suppressConsole = StdProcessConfig.suppressConsole, 1160 } 1161 1162 /** Describes a single change in a watched directory. 1163 */ 1164 struct FileChange { 1165 /// The type of change 1166 FileChangeKind kind; 1167 1168 /// The root directory of the watcher 1169 string baseDirectory; 1170 1171 /// Subdirectory containing the changed file 1172 string directory; 1173 1174 /// Name of the changed file 1175 const(char)[] name; 1176 } 1177 1178 /** Describes a spawned process 1179 */ 1180 struct Process { 1181 ProcessID pid; 1182 1183 // TODO: Convert these to PipeFD once dmd is fixed 1184 PipeFD stdin; 1185 PipeFD stdout; 1186 PipeFD stderr; 1187 } 1188 1189 mixin template Handle(string NAME, T, T invalid_value = T.init) { 1190 alias name = NAME; 1191 1192 enum invalid = typeof(this).init; 1193 1194 nothrow @nogc @safe: 1195 1196 T value = invalid_value; 1197 1198 static if (is(T.BaseType)) { 1199 alias BaseType = T.BaseType; 1200 1201 this(BaseType value, uint validation_counter) 1202 { 1203 this.value = T(value, validation_counter); 1204 } 1205 } else { 1206 alias BaseType = T; 1207 1208 uint validationCounter; 1209 1210 this(BaseType value, uint validation_counter) 1211 { 1212 this.value = value; 1213 this.validationCounter = validation_counter; 1214 } 1215 } 1216 1217 U opCast(U)() const 1218 if (is(U.BaseType) && is(typeof(U.value))) 1219 { 1220 // TODO: verify that U derives from typeof(this)! 1221 return U(cast(U.BaseType)value, validationCounter); 1222 } 1223 1224 U opCast(U)() const 1225 if (is(typeof(U(BaseType.init)))) 1226 { 1227 return cast(U)value; 1228 } 1229 1230 alias value this; 1231 } 1232 1233 alias ThreadCallbackGenParams = ubyte[8 * intptr_t.sizeof]; 1234 alias ThreadCallbackGen = void function(ref ThreadCallbackGenParams param3) @safe nothrow; 1235 deprecated alias ThreadCallback = void function(intptr_t param1) @safe nothrow; 1236 1237 struct FD { mixin Handle!("fd", size_t, size_t.max); } 1238 struct SocketFD { mixin Handle!("socket", FD); } 1239 struct StreamSocketFD { mixin Handle!("streamSocket", SocketFD); } 1240 struct StreamListenSocketFD { mixin Handle!("streamListen", SocketFD); } 1241 struct DatagramSocketFD { mixin Handle!("datagramSocket", SocketFD); } 1242 struct FileFD { mixin Handle!("file", FD); } 1243 // FD.init is required here due to https://issues.dlang.org/show_bug.cgi?id=19585 1244 struct PipeFD { mixin Handle!("pipe", FD, FD.init); } 1245 struct EventID { mixin Handle!("event", FD); } 1246 struct TimerID { mixin Handle!("timer", size_t, size_t.max); } 1247 struct WatcherID { mixin Handle!("watcher", size_t, size_t.max); } 1248 struct EventWaitID { mixin Handle!("eventWait", size_t, size_t.max); } 1249 struct SignalListenID { mixin Handle!("signal", size_t, size_t.max); } 1250 struct DNSLookupID { mixin Handle!("dns", size_t, size_t.max); } 1251 struct ProcessID { mixin Handle!("process", size_t, size_t.max); }