1 module eventcore.socket;
2 
3 import eventcore.core : eventDriver;
4 import eventcore.driver;
5 import core.time: Duration;
6 import std.exception : enforce;
7 import std.socket : Address;
8 
9 
10 StreamSocket connectStream(alias callback)(scope Address peer_address)
11 @safe {
12 	void cb(StreamSocketFD fd, ConnectStatus status) @safe nothrow {
13 		if (fd != StreamSocketFD.invalid) eventDriver.sockets.addRef(fd);
14 		callback(StreamSocket(fd), status);
15 		if (fd != StreamSocketFD.invalid) eventDriver.sockets.releaseRef(fd);
16 	}
17 
18 	auto fd = eventDriver.sockets.connectStream(peer_address, null, &cb);
19 	enforce(fd != StreamSocketFD.invalid, "Failed to create socket.");
20 	eventDriver.sockets.addRef(fd);
21 	return StreamSocket(fd);
22 }
23 
24 StreamListenSocket listenStream(scope Address bind_address)
25 @safe {
26 	auto fd = eventDriver.sockets.listenStream(bind_address, null);
27 	enforce(fd != StreamListenSocketFD.invalid, "Failed to create socket.");
28 	return StreamListenSocket(fd);
29 }
30 
31 DatagramSocket createDatagramSocket(scope Address bind_address, scope Address target_address = null)
32 @safe {
33 	auto fd = eventDriver.sockets.createDatagramSocket(bind_address, target_address);
34 	enforce(fd != DatagramSocketFD.invalid, "Failed to create socket.");
35 	return DatagramSocket(fd);
36 }
37 
38 struct StreamSocket {
39 	@safe: nothrow:
40 
41 	private StreamSocketFD m_fd;
42 
43 	private this(StreamSocketFD fd)
44 	{
45 		m_fd = fd;
46 	}
47 
48 	this(this) { if (m_fd != StreamSocketFD.invalid) eventDriver.sockets.addRef(m_fd); }
49 	~this() { if (m_fd != StreamSocketFD.invalid) eventDriver.sockets.releaseRef(m_fd); }
50 
51 	@property ConnectionState state() { return eventDriver.sockets.getConnectionState(m_fd); }
52 	@property void tcpNoDelay(bool enable) { eventDriver.sockets.setTCPNoDelay(m_fd, enable); }
53 	void setKeepAlive(bool enable) { eventDriver.sockets.setKeepAlive(m_fd, enable); }
54 	void setKeepAliveParams(Duration idle, Duration interval, int probeCount = 5) {
55 		eventDriver.sockets.setKeepAliveParams(m_fd, idle, interval, probeCount);
56 	}
57 	void setUserTimeout(Duration timeout) { eventDriver.sockets.setUserTimeout(m_fd, timeout); }
58 }
59 
60 void read(alias callback)(ref StreamSocket socket, ubyte[] buffer, IOMode mode)
61 {
62 	void cb(StreamSocketFD, IOStatus status, size_t nbytes) @safe nothrow {
63 		callback(status, nbytes);
64 	}
65 	eventDriver.sockets.read(socket.m_fd, buffer, mode, &cb);
66 }
67 void cancelRead(ref StreamSocket socket) @safe nothrow  { eventDriver.sockets.cancelRead(socket.m_fd); }
68 void waitForData(alias callback)(ref StreamSocket socket)
69 {
70 	void cb(StreamSocketFD, IOStatus status, size_t nbytes) @safe nothrow {
71 		callback(status, nbytes);
72 	}
73 	eventDriver.sockets.waitForData(socket.m_fd, &cb);
74 }
75 void write(alias callback)(ref StreamSocket socket, const(ubyte)[] buffer, IOMode mode)
76 {
77 	void cb(StreamSocketFD, IOStatus status, size_t nbytes) @safe nothrow {
78 		callback(status, nbytes);
79 	}
80 	eventDriver.sockets.write(socket.m_fd, buffer, mode, &cb);
81 }
82 void cancelWrite(ref StreamSocket socket) @safe nothrow { eventDriver.sockets.cancelWrite(socket.m_fd); }
83 void shutdown(ref StreamSocket socket, bool shut_read = true, bool shut_write = true)
84 @safe nothrow {
85 	eventDriver.sockets.shutdown(socket.m_fd, shut_read, shut_write);
86 }
87 
88 
89 struct StreamListenSocket {
90 	@safe: nothrow:
91 
92 	private StreamListenSocketFD m_fd;
93 
94 	private this(StreamListenSocketFD fd)
95 	{
96 		m_fd = fd;
97 	}
98 
99 	this(this) { if (m_fd != StreamListenSocketFD.invalid) eventDriver.sockets.addRef(m_fd); }
100 	~this() { if (m_fd != StreamListenSocketFD.invalid) eventDriver.sockets.releaseRef(m_fd); }
101 }
102 
103 void waitForConnections(alias callback)(ref StreamListenSocket socket)
104 {
105 	void cb(StreamListenSocketFD, StreamSocketFD sock, scope RefAddress addr) @safe nothrow {
106 		auto ss = StreamSocket(sock);
107 		callback(ss, addr);
108 	}
109 	eventDriver.sockets.waitForConnections(socket.m_fd, &cb);
110 }
111 
112 
113 struct DatagramSocket {
114 	@safe: nothrow:
115 
116 	private DatagramSocketFD m_fd;
117 
118 	private this(DatagramSocketFD fd)
119 	{
120 		m_fd = fd;
121 	}
122 
123 	this(this) { if (m_fd != DatagramSocketFD.invalid) eventDriver.sockets.addRef(m_fd); }
124 	~this() { if (m_fd != DatagramSocketFD.invalid) eventDriver.sockets.releaseRef(m_fd); }
125 
126 	@property void broadcast(bool enable) { eventDriver.sockets.setBroadcast(m_fd, enable); }
127 }
128 
129 void receive(alias callback)(ref DatagramSocket socket, ubyte[] buffer, IOMode mode) {
130 	void cb(DatagramSocketFD fd, IOStatus status, size_t bytes_written, scope RefAddress address) @safe nothrow {
131 		callback(status, bytes_written, address);
132 	}
133 	eventDriver.sockets.receive(socket.m_fd, buffer, mode, &cb);
134 }
135 void cancelReceive(ref DatagramSocket socket) { eventDriver.sockets.cancelReceive(socket.m_fd); }
136 void send(alias callback)(ref DatagramSocket socket, const(ubyte)[] buffer, IOMode mode, Address target_address = null) {
137 	void cb(DatagramSocketFD fd, IOStatus status, size_t bytes_written, scope RefAddress) @safe nothrow {
138 		callback(status, bytes_written);
139 	}
140 	eventDriver.sockets.send(socket.m_fd, buffer, mode, target_address, &cb);
141 }
142 void cancelSend(ref DatagramSocket socket) { eventDriver.sockets.cancelSend(socket.m_fd); }