1 module eventcore.socket; 2 3 import eventcore.core : eventDriver; 4 import eventcore.driver; 5 import core.time: Duration; 6 import std.exception : enforce; 7 import std.socket : Address; 8 9 10 StreamSocket connectStream(alias callback)(scope Address peer_address) 11 @safe { 12 void cb(StreamSocketFD fd, ConnectStatus status) @safe nothrow { 13 if (fd != StreamSocketFD.invalid) eventDriver.sockets.addRef(fd); 14 callback(StreamSocket(fd), status); 15 if (fd != StreamSocketFD.invalid) eventDriver.sockets.releaseRef(fd); 16 } 17 18 auto fd = eventDriver.sockets.connectStream(peer_address, null, &cb); 19 enforce(fd != StreamSocketFD.invalid, "Failed to create socket."); 20 eventDriver.sockets.addRef(fd); 21 return StreamSocket(fd); 22 } 23 24 StreamListenSocket listenStream(scope Address bind_address) 25 @safe { 26 auto fd = eventDriver.sockets.listenStream(bind_address, null); 27 enforce(fd != StreamListenSocketFD.invalid, "Failed to create socket."); 28 return StreamListenSocket(fd); 29 } 30 31 DatagramSocket createDatagramSocket(scope Address bind_address, scope Address target_address = null) 32 @safe { 33 auto fd = eventDriver.sockets.createDatagramSocket(bind_address, target_address); 34 enforce(fd != DatagramSocketFD.invalid, "Failed to create socket."); 35 return DatagramSocket(fd); 36 } 37 38 struct StreamSocket { 39 @safe: nothrow: 40 41 private StreamSocketFD m_fd; 42 43 private this(StreamSocketFD fd) 44 { 45 m_fd = fd; 46 } 47 48 this(this) { if (m_fd != StreamSocketFD.invalid) eventDriver.sockets.addRef(m_fd); } 49 ~this() { if (m_fd != StreamSocketFD.invalid) eventDriver.sockets.releaseRef(m_fd); } 50 51 @property ConnectionState state() { return eventDriver.sockets.getConnectionState(m_fd); } 52 @property void tcpNoDelay(bool enable) { eventDriver.sockets.setTCPNoDelay(m_fd, enable); } 53 void setKeepAlive(bool enable) { eventDriver.sockets.setKeepAlive(m_fd, enable); } 54 void setKeepAliveParams(Duration idle, Duration interval, int probeCount = 5) { 55 eventDriver.sockets.setKeepAliveParams(m_fd, idle, interval, probeCount); 56 } 57 void setUserTimeout(Duration timeout) { eventDriver.sockets.setUserTimeout(m_fd, timeout); } 58 } 59 60 void read(alias callback)(ref StreamSocket socket, ubyte[] buffer, IOMode mode) 61 { 62 void cb(StreamSocketFD, IOStatus status, size_t nbytes) @safe nothrow { 63 callback(status, nbytes); 64 } 65 eventDriver.sockets.read(socket.m_fd, buffer, mode, &cb); 66 } 67 void cancelRead(ref StreamSocket socket) @safe nothrow { eventDriver.sockets.cancelRead(socket.m_fd); } 68 void waitForData(alias callback)(ref StreamSocket socket) 69 { 70 void cb(StreamSocketFD, IOStatus status, size_t nbytes) @safe nothrow { 71 callback(status, nbytes); 72 } 73 eventDriver.sockets.waitForData(socket.m_fd, &cb); 74 } 75 void write(alias callback)(ref StreamSocket socket, const(ubyte)[] buffer, IOMode mode) 76 { 77 void cb(StreamSocketFD, IOStatus status, size_t nbytes) @safe nothrow { 78 callback(status, nbytes); 79 } 80 eventDriver.sockets.write(socket.m_fd, buffer, mode, &cb); 81 } 82 void cancelWrite(ref StreamSocket socket) @safe nothrow { eventDriver.sockets.cancelWrite(socket.m_fd); } 83 void shutdown(ref StreamSocket socket, bool shut_read = true, bool shut_write = true) 84 @safe nothrow { 85 eventDriver.sockets.shutdown(socket.m_fd, shut_read, shut_write); 86 } 87 88 89 struct StreamListenSocket { 90 @safe: nothrow: 91 92 private StreamListenSocketFD m_fd; 93 94 private this(StreamListenSocketFD fd) 95 { 96 m_fd = fd; 97 } 98 99 this(this) { if (m_fd != StreamListenSocketFD.invalid) eventDriver.sockets.addRef(m_fd); } 100 ~this() { if (m_fd != StreamListenSocketFD.invalid) eventDriver.sockets.releaseRef(m_fd); } 101 } 102 103 void waitForConnections(alias callback)(ref StreamListenSocket socket) 104 { 105 void cb(StreamListenSocketFD, StreamSocketFD sock, scope RefAddress addr) @safe nothrow { 106 auto ss = StreamSocket(sock); 107 callback(ss, addr); 108 } 109 eventDriver.sockets.waitForConnections(socket.m_fd, &cb); 110 } 111 112 113 struct DatagramSocket { 114 @safe: nothrow: 115 116 private DatagramSocketFD m_fd; 117 118 private this(DatagramSocketFD fd) 119 { 120 m_fd = fd; 121 } 122 123 this(this) { if (m_fd != DatagramSocketFD.invalid) eventDriver.sockets.addRef(m_fd); } 124 ~this() { if (m_fd != DatagramSocketFD.invalid) eventDriver.sockets.releaseRef(m_fd); } 125 126 @property void broadcast(bool enable) { eventDriver.sockets.setBroadcast(m_fd, enable); } 127 } 128 129 void receive(alias callback)(ref DatagramSocket socket, ubyte[] buffer, IOMode mode) { 130 void cb(DatagramSocketFD fd, IOStatus status, size_t bytes_written, scope RefAddress address) @safe nothrow { 131 callback(status, bytes_written, address); 132 } 133 eventDriver.sockets.receive(socket.m_fd, buffer, mode, &cb); 134 } 135 void cancelReceive(ref DatagramSocket socket) { eventDriver.sockets.cancelReceive(socket.m_fd); } 136 void send(alias callback)(ref DatagramSocket socket, const(ubyte)[] buffer, IOMode mode, Address target_address = null) { 137 void cb(DatagramSocketFD fd, IOStatus status, size_t bytes_written, scope RefAddress) @safe nothrow { 138 callback(status, bytes_written); 139 } 140 eventDriver.sockets.send(socket.m_fd, buffer, mode, target_address, &cb); 141 } 142 void cancelSend(ref DatagramSocket socket) { eventDriver.sockets.cancelSend(socket.m_fd); }