1 module eventcore.drivers.posix.sockets;
2 @safe:
3 
4 import eventcore.driver;
5 import eventcore.drivers.posix.driver;
6 import eventcore.internal.utils;
7 
8 import std.algorithm.comparison : among, min, max;
9 import std.socket : Address, AddressFamily, InternetAddress, Internet6Address, UnknownAddress;
10 
11 import core.time: Duration;
12 
13 version (Posix) {
14 	import std.socket : UnixAddress;
15 	import core.sys.posix.netdb : AI_ADDRCONFIG, AI_V4MAPPED, addrinfo, freeaddrinfo, getaddrinfo;
16 	import core.sys.posix.netinet.in_;
17 	import core.sys.posix.netinet.tcp;
18 	import core.sys.posix.sys.un;
19 	import core.sys.posix.unistd : close, read, write;
20 	import core.stdc.errno;
21 	import core.sys.posix.fcntl;
22 	import core.sys.posix.sys.socket;
23 
24 	version (linux) enum SO_REUSEPORT = 15;
25 	else enum SO_REUSEPORT = 0x200;
26 
27 	static if (!is(typeof(O_CLOEXEC)))
28 	{
29 		version (linux) enum O_CLOEXEC = 0x80000;
30 		else version (FreeBSD) enum O_CLOEXEC = 0x100000;
31 		else version (Solaris) enum O_CLOEXEC = 0x800000;
32 		else version (DragonFlyBSD) enum O_CLOEXEC = 0x0020000;
33 		else version (NetBSD) enum O_CLOEXEC = 0x400000;
34 		else version (OpenBSD) enum O_CLOEXEC = 0x10000;
35 		else version (OSX) enum O_CLOEXEC = 0x1000000;
36 	}
37 }
38 version (linux) {
39 	extern (C) int accept4(int sockfd, sockaddr *addr, socklen_t *addrlen, int flags) nothrow @nogc;
40 	static if (!is(typeof(SOCK_NONBLOCK)))
41 		enum SOCK_NONBLOCK = 0x800;
42 	static if (!is(typeof(SOCK_CLOEXEC)))
43 		enum SOCK_CLOEXEC = 0x80000;
44 
45     import core.sys.linux.netinet.in_ : IP_ADD_MEMBERSHIP, IP_MULTICAST_LOOP;
46 
47 	// Linux-specific TCP options
48 	// https://github.com/torvalds/linux/blob/master/include/uapi/linux/tcp.h#L95
49 	// Some day we should siply import core.sys.linux.netinet.tcp;
50 	static if (!is(typeof(SOL_TCP)))
51 		enum SOL_TCP = 6;
52 	static if (!is(typeof(TCP_KEEPIDLE)))
53 		enum TCP_KEEPIDLE = 4;
54 	static if (!is(typeof(TCP_KEEPINTVL)))
55 		enum TCP_KEEPINTVL = 5;
56 	static if (!is(typeof(TCP_KEEPCNT)))
57 		enum TCP_KEEPCNT = 6;
58 	static if (!is(typeof(TCP_USER_TIMEOUT)))
59 		enum TCP_USER_TIMEOUT = 18;
60 }
61 version (OSX) {
62     import core.sys.darwin.netinet.in_ : IP_ADD_MEMBERSHIP, IP_MULTICAST_LOOP;
63 
64 	static if (!is(typeof(ESHUTDOWN))) enum ESHUTDOWN = 58;
65 }
66 version (FreeBSD) {
67     import core.sys.freebsd.netinet.in_ : IP_ADD_MEMBERSHIP, IP_MULTICAST_LOOP;
68 }
69 version (DragonFlyBSD) {
70 	import core.sys.dragonflybsd.netinet.in_ : IP_ADD_MEMBERSHIP, IP_MULTICAST_LOOP;
71 }
72 version (Solaris) {
73 	enum IP_ADD_MEMBERSHIP = 0x13;
74 	enum IP_MULTICAST_LOOP = 0x12;
75 }
76 version (Windows) {
77 	import core.sys.windows.windows;
78 	import core.sys.windows.winsock2;
79 	alias sockaddr_storage = SOCKADDR_STORAGE;
80 
81 	alias EAGAIN = WSAEWOULDBLOCK;
82 	alias ECONNREFUSED = WSAECONNREFUSED;
83 	alias EPIPE = WSAECONNABORTED;
84 	alias ECONNRESET = WSAECONNRESET;
85 	alias ENETRESET = WSAENETRESET;
86 	alias ENOTCONN = WSAENOTCONN;
87 	alias ETIMEDOUT = WSAETIMEDOUT;
88 	alias ESHUTDOWN = WSAESHUTDOWN;
89 
90 	enum SHUT_RDWR = SD_BOTH;
91 	enum SHUT_RD = SD_RECEIVE;
92 	enum SHUT_WR = SD_SEND;
93 
94 	extern (C) int read(int fd, void *buffer, uint count) nothrow;
95 	extern (C) int write(int fd, const(void) *buffer, uint count) nothrow;
96 	extern (C) int close(int fd) nothrow @safe;
97 }
98 
99 version (Android) {
100 	static if (!is(typeof(MSG_NOSIGNAL)))
101 		enum MSG_NOSIGNAL = 0x4000;
102 }
103 
104 version (Posix) {
105 	version (OSX) {
106 		enum SEND_FLAGS = 0;
107 	} else {
108 		enum SEND_FLAGS = MSG_NOSIGNAL;
109 	}
110 } else {
111 	enum SEND_FLAGS = 0;
112 }
113 
114 
115 final class PosixEventDriverSockets(Loop : PosixEventLoop) : EventDriverSockets {
116 @safe: /*@nogc:*/ nothrow:
117 	private Loop m_loop;
118 
119 	this(Loop loop) { m_loop = loop; }
120 
121 	final override StreamSocketFD connectStream(scope Address address, scope Address bind_address, ConnectCallback on_connect)
122 	{
123 		assert(on_connect !is null);
124 
125 		// @trusted to escape DIP1000's `scope` check
126 		auto sockfd = () @trusted { return createSocket(address.addressFamily, SOCK_STREAM); }();
127 		if (sockfd == -1) {
128 			on_connect(StreamSocketFD.invalid, ConnectStatus.socketCreateFailure);
129 			return StreamSocketFD.invalid;
130 		}
131 
132 		int bret;
133 		if (bind_address !is null)
134 			() @trusted { bret = bind(sockfd, bind_address.name, bind_address.nameLen); } ();
135 
136 		if (bret != 0) {
137 			closeSocket(sockfd);
138 			on_connect(StreamSocketFD.invalid, ConnectStatus.bindFailure);
139 			return StreamSocketFD.invalid;
140 		}
141 
142 		auto sock = m_loop.initFD!StreamSocketFD(sockfd, FDFlags.none, StreamSocketSlot.init);
143 		m_loop.registerFD(sock, EventMask.read|EventMask.write);
144 
145 		auto ret = () @trusted { return connect(cast(sock_t)sock, address.name, address.nameLen); } ();
146 		if (ret == 0) {
147 			m_loop.m_fds[sock].streamSocket.state = ConnectionState.connected;
148 			on_connect(sock, ConnectStatus.connected);
149 		} else {
150 			auto err = getSocketError();
151 			if (err.among!(EAGAIN, EINPROGRESS)) {
152 				with (m_loop.m_fds[sock].streamSocket) {
153 					connectCallback = on_connect;
154 					state = ConnectionState.connecting;
155 				}
156 				m_loop.setNotifyCallback!(EventType.write)(sock, &onConnect);
157 			} else {
158 				m_loop.unregisterFD(sock, EventMask.read|EventMask.write);
159 				m_loop.clearFD!StreamSocketSlot(sock);
160 				closeSocket(sockfd);
161 				on_connect(StreamSocketFD.invalid, determineConnectStatus(err));
162 				return StreamSocketFD.invalid;
163 			}
164 		}
165 
166 		return sock;
167 	}
168 
169 	final override void cancelConnectStream(StreamSocketFD sock)
170 	{
171 		if (!isValid(sock)) return;
172 
173 		with (m_loop.m_fds[sock].streamSocket)
174 		{
175 			assert(state == ConnectionState.connecting,
176 				"Unable to cancel connect on the socket that is not in connecting state");
177 			state = ConnectionState.closed;
178 			connectCallback = null;
179 			m_loop.setNotifyCallback!(EventType.write)(sock, null);
180 		}
181 	}
182 
183 	final override StreamSocketFD adoptStream(int socket)
184 	{
185 		if (m_loop.m_fds[socket].common.refCount) // FD already in use?
186 			return StreamSocketFD.invalid;
187 		setSocketNonBlocking(socket);
188 		auto fd = m_loop.initFD!StreamSocketFD(socket, FDFlags.none, StreamSocketSlot.init);
189 		m_loop.registerFD(fd, EventMask.read|EventMask.write);
190 		return fd;
191 	}
192 
193 	private void onConnect(FD fd)
194 	{
195 		auto sock = cast(StreamSocketFD)fd;
196 		auto l = lockHandle(sock);
197 		m_loop.setNotifyCallback!(EventType.write)(sock, null);
198 
199 		ConnectStatus status = ConnectStatus.unknownError;
200 		int err;
201 		socklen_t errlen = err.sizeof;
202 		if (() @trusted { return getsockopt(cast(sock_t)fd, SOL_SOCKET, SO_ERROR, &err, &errlen); } () == 0)
203 			status = determineConnectStatus(err);
204 
205 		with (m_loop.m_fds[sock].streamSocket) {
206 			assert(state == ConnectionState.connecting);
207 
208 			state = status == ConnectStatus.connected
209 				? ConnectionState.connected
210 				: ConnectionState.closed;
211 
212 			auto cb = connectCallback;
213 			connectCallback = null;
214 			if (cb) cb(cast(StreamSocketFD)sock, status);
215 		}
216 	}
217 
218 	private ConnectStatus determineConnectStatus(int sock_err)
219 	{
220 		switch (sock_err) {
221 			default: return ConnectStatus.unknownError;
222 			case 0: return ConnectStatus.connected;
223 			case ECONNREFUSED: return ConnectStatus.refused;
224 		}
225 	}
226 
227 	alias listenStream = EventDriverSockets.listenStream;
228 	final override StreamListenSocketFD listenStream(scope Address address, StreamListenOptions options, AcceptCallback on_accept)
229 	{
230 		// @trusted to escape DIP1000's `scope` check
231 		auto sockfd = () @trusted { return createSocket(address.addressFamily, SOCK_STREAM); }();
232 		if (sockfd == -1) return StreamListenSocketFD.invalid;
233 
234 		auto succ = () @trusted {
235 			int tmp_reuse = 1;
236 			// FIXME: error handling!
237 			if (options & StreamListenOptions.reuseAddress) {
238 				if (setsockopt(sockfd, SOL_SOCKET, SO_REUSEADDR, &tmp_reuse, tmp_reuse.sizeof) != 0)
239 					return false;
240 			}
241 
242 			version (Windows) {}
243 			else {
244 				if ((options & StreamListenOptions.reusePort) && setsockopt(sockfd, SOL_SOCKET, SO_REUSEPORT, &tmp_reuse, tmp_reuse.sizeof) != 0)
245 					return false;
246 			}
247 
248 			if (bind(sockfd, address.name, address.nameLen) != 0)
249 				return false;
250 
251 			if (listen(sockfd, getBacklogSize()) != 0)
252 				return false;
253 
254 			return true;
255 		} ();
256 
257 		if (!succ) {
258 			closeSocket(sockfd);
259 			return StreamListenSocketFD.invalid;
260 		}
261 
262 		auto sock = m_loop.initFD!StreamListenSocketFD(sockfd, FDFlags.none, StreamListenSocketSlot.init);
263 
264 		if (on_accept) waitForConnections(sock, on_accept);
265 
266 		return sock;
267 	}
268 
269 	final override void waitForConnections(StreamListenSocketFD sock, AcceptCallback on_accept)
270 	{
271 		if (!isValid(sock)) return;
272 
273 		m_loop.registerFD(sock, EventMask.read, false);
274 		m_loop.m_fds[sock].streamListen.acceptCallback = on_accept;
275 		m_loop.setNotifyCallback!(EventType.read)(sock, &onAccept);
276 		onAccept(sock);
277 	}
278 
279 	private void onAccept(FD listenfd)
280 	{
281 		sock_t sockfd;
282 		sockaddr_storage addr;
283 		socklen_t addr_len = addr.sizeof;
284 		version (linux) {
285 			() @trusted { sockfd = accept4(cast(sock_t)listenfd, () @trusted { return cast(sockaddr*)&addr; } (), &addr_len, SOCK_NONBLOCK | SOCK_CLOEXEC); } ();
286 			if (sockfd == -1) return;
287 		} else {
288 			() @trusted { sockfd = accept(cast(sock_t)listenfd, () @trusted { return cast(sockaddr*)&addr; } (), &addr_len); } ();
289 			if (sockfd == -1) return;
290 			setSocketNonBlocking(sockfd, true);
291 		}
292 		auto fd = m_loop.initFD!StreamSocketFD(sockfd, FDFlags.none, StreamSocketSlot.init);
293 		m_loop.m_fds[fd].streamSocket.state = ConnectionState.connected;
294 		m_loop.registerFD(fd, EventMask.read|EventMask.write);
295 		//print("accept %d", sockfd);
296 		scope RefAddress addrc = new RefAddress(() @trusted { return cast(sockaddr*)&addr; } (), addr_len);
297 		m_loop.m_fds[listenfd].streamListen.acceptCallback(cast(StreamListenSocketFD)listenfd, fd, addrc);
298 	}
299 
300 	ConnectionState getConnectionState(StreamSocketFD sock)
301 	{
302 		if (!isValid(sock)) return ConnectionState.closed;
303 		return m_loop.m_fds[sock].streamSocket.state;
304 	}
305 
306 	final override bool getLocalAddress(SocketFD sock, scope RefAddress dst)
307 	{
308 		if (!isValid(sock)) return false;
309 
310 		socklen_t addr_len = dst.nameLen;
311 		if (() @trusted { return getsockname(cast(sock_t)sock, dst.name, &addr_len); } () != 0)
312 			return false;
313 		dst.cap(addr_len);
314 		return true;
315 	}
316 
317 	final override bool getRemoteAddress(SocketFD sock, scope RefAddress dst)
318 	{
319 		if (!isValid(sock)) return false;
320 
321 		socklen_t addr_len = dst.nameLen;
322 		if (() @trusted { return getpeername(cast(sock_t)sock, dst.name, &addr_len); } () != 0)
323 			return false;
324 		dst.cap(addr_len);
325 		return true;
326 	}
327 
328 	final override void setTCPNoDelay(StreamSocketFD socket, bool enable)
329 	{
330 		if (!isValid(socket)) return;
331 
332 		int opt = enable;
333 		() @trusted { setsockopt(cast(sock_t)socket, IPPROTO_TCP, TCP_NODELAY, cast(char*)&opt, opt.sizeof); } ();
334 	}
335 
336 	override void setKeepAlive(StreamSocketFD socket, bool enable) @trusted
337 	{
338 		if (!isValid(socket)) return;
339 
340 		int opt = enable;
341 		int err = setsockopt(cast(sock_t)socket, SOL_SOCKET, SO_KEEPALIVE, &opt, int.sizeof);
342 		if (err != 0)
343 			print("sock error in setKeepAlive: %s", getSocketError);
344 	}
345 
346 	override void setKeepAliveParams(StreamSocketFD socket, Duration idle, Duration interval, int probeCount) @trusted
347 	{
348 		if (!isValid(socket)) return;
349 
350 		// dunnno about BSD\OSX, maybe someone should fix it for them later
351 		version (linux) {
352 			setKeepAlive(socket, true);
353 			int int_opt = cast(int) idle.total!"seconds"();
354 			int err = setsockopt(cast(sock_t)socket, SOL_TCP, TCP_KEEPIDLE, &int_opt, int.sizeof);
355 			if (err != 0) {
356 				print("sock error on setsockopt TCP_KEEPIDLE: %s", getSocketError);
357 				return;
358 			}
359 			int_opt = cast(int) interval.total!"seconds"();
360 			err = setsockopt(cast(sock_t)socket, SOL_TCP, TCP_KEEPINTVL, &int_opt, int.sizeof);
361 			if (err != 0) {
362 				print("sock error on setsockopt TCP_KEEPINTVL: %s", getSocketError);
363 				return;
364 			}
365 			err = setsockopt(cast(sock_t)socket, SOL_TCP, TCP_KEEPCNT, &probeCount, int.sizeof);
366 			if (err != 0)
367 				print("sock error on setsockopt TCP_KEEPCNT: %s", getSocketError);
368 		}
369 	}
370 
371 	override void setUserTimeout(StreamSocketFD socket, Duration timeout) @trusted
372 	{
373 		if (!isValid(socket)) return;
374 
375 		version (linux) {
376 			uint tmsecs = cast(uint) timeout.total!"msecs";
377 			int err = setsockopt(cast(sock_t)socket, SOL_TCP, TCP_USER_TIMEOUT, &tmsecs, uint.sizeof);
378 			if (err != 0)
379 				print("sock error on setsockopt TCP_USER_TIMEOUT %s", getSocketError);
380 		}
381 	}
382 
383 	final override void read(StreamSocketFD socket, ubyte[] buffer, IOMode mode, IOCallback on_read_finish)
384 	{
385 		if (!isValid(socket)) {
386 			on_read_finish(socket, IOStatus.invalidHandle, 0);
387 			return;
388 		}
389 
390 		/*if (buffer.length == 0) {
391 			on_read_finish(socket, IOStatus.ok, 0);
392 			return;
393 		}*/
394 
395 		sizediff_t ret;
396 		() @trusted { ret = .recv(cast(sock_t)socket, buffer.ptr, min(buffer.length, int.max), 0); } ();
397 
398 		if (ret < 0) {
399 			auto err = getSocketError();
400 			if (err.among!(EAGAIN, EINPROGRESS)) {
401 				if (mode == IOMode.immediate) {
402 					on_read_finish(socket, IOStatus.wouldBlock, 0);
403 					return;
404 				}
405 			} else {
406 				auto st = handleReadError(err, m_loop.m_fds[socket].streamSocket);
407 				print("sock error %s!", err);
408 				on_read_finish(socket, st, 0);
409 				return;
410 			}
411 		}
412 
413 		if (ret == 0 && buffer.length > 0) {
414 			// treat as if the connection read end was shut down
415 			handleReadError(ESHUTDOWN, m_loop.m_fds[socket].streamSocket);
416 			on_read_finish(socket, IOStatus.disconnected, 0);
417 			return;
418 		}
419 
420 		if (ret >= 0) {
421 			buffer = buffer[ret .. $];
422 			if (mode != IOMode.all || buffer.length == 0) {
423 				on_read_finish(socket, IOStatus.ok, ret);
424 				return;
425 			}
426 		}
427 
428 		// NOTE: since we know that not all data was read from the stream
429 		//       socket, the next call to recv is guaranteed to return EGAIN
430 		//       and we can avoid that call.
431 
432 		with (m_loop.m_fds[socket].streamSocket) {
433 			readCallback = on_read_finish;
434 			readMode = mode;
435 			bytesRead = ret > 0 ? ret : 0;
436 			readBuffer = buffer;
437 		}
438 
439 		m_loop.setNotifyCallback!(EventType.read)(socket, &onSocketRead);
440 	}
441 
442 	override void cancelRead(StreamSocketFD socket)
443 	{
444 		if (!isValid(socket)) return;
445 
446 		assert(m_loop.m_fds[socket].streamSocket.readCallback !is null, "Cancelling read when there is no read in progress.");
447 		m_loop.setNotifyCallback!(EventType.read)(socket, null);
448 		with (m_loop.m_fds[socket].streamSocket) {
449 			readBuffer = null;
450 		}
451 	}
452 
453 	private void onSocketRead(FD fd)
454 	{
455 		auto slot = () @trusted { return &m_loop.m_fds[fd].streamSocket(); } ();
456 		auto socket = cast(StreamSocketFD)fd;
457 
458 		void finalize()(IOStatus status)
459 		{
460 			auto l = lockHandle(socket);
461 			m_loop.setNotifyCallback!(EventType.read)(socket, null);
462 			assert(m_loop.m_fds[socket].common.refCount > 0);
463 			//m_fds[fd].readBuffer = null;
464 			slot.readCallback(socket, status, slot.bytesRead);
465 			assert(m_loop.m_fds[socket].common.refCount > 0);
466 		}
467 
468 		while (true) {
469 			sizediff_t ret = 0;
470 			() @trusted { ret = .recv(cast(sock_t)socket, slot.readBuffer.ptr, min(slot.readBuffer.length, int.max), 0); } ();
471 			if (ret < 0) {
472 				auto err = getSocketError();
473 				if (!err.among!(EAGAIN, EINPROGRESS)) {
474 					auto st = handleReadError(err, *slot);
475 					finalize(st);
476 					return;
477 				}
478 			}
479 
480 			if (ret == 0 && slot.readBuffer.length) {
481 				// treat as if the connection read end was shut down
482 				handleReadError(ESHUTDOWN, m_loop.m_fds[socket].streamSocket);
483 				finalize(IOStatus.disconnected);
484 				return;
485 			}
486 
487 			if (ret > 0 || !slot.readBuffer.length) {
488 				slot.bytesRead += ret;
489 				slot.readBuffer = slot.readBuffer[ret .. $];
490 				if (slot.readMode != IOMode.all || slot.readBuffer.length == 0) {
491 					finalize(IOStatus.ok);
492 					return;
493 				}
494 			}
495 
496 			// retry if this was just a partial read, as it could mean that
497 			// the connection was closed by the remove peer
498 			if (ret <= 0 || !slot.readBuffer.length) break;
499 		}
500 	}
501 
502 	private static IOStatus handleReadError(int err, ref StreamSocketSlot slot)
503 	@safe nothrow {
504 		switch (err) {
505 			case 0: return IOStatus.ok;
506 			case EPIPE, ECONNRESET, ENETRESET, ENOTCONN, ETIMEDOUT:
507 				slot.state = ConnectionState.closed;
508 				return IOStatus.disconnected;
509 			case ESHUTDOWN:
510 				if (slot.state == ConnectionState.activeClose)
511 					slot.state = ConnectionState.closed;
512 				else if (slot.state != ConnectionState.closed)
513 					slot.state = ConnectionState.passiveClose;
514 				return IOStatus.disconnected;
515 			default: return IOStatus.error;
516 		}
517 	}
518 
519 
520 	final override void write(StreamSocketFD socket, const(ubyte)[] buffer, IOMode mode, IOCallback on_write_finish)
521 	{
522 		if (!isValid(socket)) {
523 			on_write_finish(socket, IOStatus.invalidHandle, 0);
524 			return;
525 		}
526 
527 		if (buffer.length == 0) {
528 			on_write_finish(socket, IOStatus.ok, 0);
529 			return;
530 		}
531 
532 		sizediff_t ret;
533 		() @trusted { ret = .send(cast(sock_t)socket, buffer.ptr, min(buffer.length, int.max), SEND_FLAGS); } ();
534 
535 		if (ret < 0) {
536 			auto err = getSocketError();
537 			if (err.among!(EAGAIN, EINPROGRESS)) {
538 				if (mode == IOMode.immediate) {
539 					on_write_finish(socket, IOStatus.wouldBlock, 0);
540 					return;
541 				}
542 			} else {
543 				auto st = handleWriteError(err, m_loop.m_fds[socket].streamSocket);
544 				on_write_finish(socket, st, 0);
545 				return;
546 			}
547 		}
548 
549 		size_t bytes_written = 0;
550 
551 		if (ret >= 0) {
552 			bytes_written += ret;
553 			buffer = buffer[ret .. $];
554 			if (mode != IOMode.all || buffer.length == 0) {
555 				on_write_finish(socket, IOStatus.ok, bytes_written);
556 				return;
557 			}
558 		}
559 
560 		// NOTE: since we know that not all data was writtem to the stream
561 		//       socket, the next call to send is guaranteed to return EGAIN
562 		//       and we can avoid that call.
563 
564 		with (m_loop.m_fds[socket].streamSocket) {
565 			writeCallback = on_write_finish;
566 			writeMode = mode;
567 			bytesWritten = ret >= 0 ? ret : 0;
568 			writeBuffer = buffer;
569 		}
570 
571 		m_loop.setNotifyCallback!(EventType.write)(socket, &onSocketWrite);
572 	}
573 
574 	override void cancelWrite(StreamSocketFD socket)
575 	{
576 		if (!isValid(socket)) return;
577 
578 		assert(m_loop.m_fds[socket].streamSocket.writeCallback !is null, "Cancelling write when there is no write in progress.");
579 		m_loop.setNotifyCallback!(EventType.write)(socket, null);
580 		m_loop.m_fds[socket].streamSocket.writeBuffer = null;
581 	}
582 
583 	private void onSocketWrite(FD fd)
584 	{
585 		auto slot = () @trusted { return &m_loop.m_fds[fd].streamSocket(); } ();
586 		auto socket = cast(StreamSocketFD)fd;
587 
588 		sizediff_t ret;
589 		() @trusted { ret = .send(cast(sock_t)socket, slot.writeBuffer.ptr, min(slot.writeBuffer.length, int.max), SEND_FLAGS); } ();
590 
591 		if (ret < 0) {
592 			auto err = getSocketError();
593 			if (!err.among!(EAGAIN, EINPROGRESS)) {
594 				auto l = lockHandle(socket);
595 				m_loop.setNotifyCallback!(EventType.write)(socket, null);
596 				auto st = handleWriteError(err, *slot);
597 				slot.writeCallback(socket, st, slot.bytesRead);
598 				return;
599 			}
600 		}
601 
602 		if (ret >= 0) {
603 			slot.bytesWritten += ret;
604 			slot.writeBuffer = slot.writeBuffer[ret .. $];
605 			if (slot.writeMode != IOMode.all || slot.writeBuffer.length == 0) {
606 				auto l = lockHandle(socket);
607 				m_loop.setNotifyCallback!(EventType.write)(socket, null);
608 				slot.writeCallback(cast(StreamSocketFD)socket, IOStatus.ok, slot.bytesWritten);
609 				return;
610 			}
611 		}
612 	}
613 
614 	private static IOStatus handleWriteError(int err, ref StreamSocketSlot slot)
615 	@safe nothrow {
616 		switch (err) {
617 			case 0: return IOStatus.ok;
618 			case EPIPE, ECONNRESET, ENETRESET, ENOTCONN, ETIMEDOUT:
619 				slot.state = ConnectionState.closed;
620 				return IOStatus.disconnected;
621 			case ESHUTDOWN:
622 				if (slot.state == ConnectionState.passiveClose)
623 					slot.state = ConnectionState.closed;
624 				else if (slot.state != ConnectionState.closed)
625 					slot.state = ConnectionState.activeClose;
626 				return IOStatus.disconnected;
627 			default: return IOStatus.error;
628 		}
629 	}
630 
631 
632 	final override void waitForData(StreamSocketFD socket, IOCallback on_data_available)
633 	{
634 		if (!isValid(socket)) {
635 			on_data_available(socket, IOStatus.invalidHandle, 0);
636 			return;
637 		}
638 
639 		sizediff_t ret;
640 		ubyte dummy;
641 		() @trusted { ret = recv(cast(sock_t)socket, &dummy, 1, MSG_PEEK); } ();
642 
643 		if (ret < 0) {
644 			auto err = getSocketError();
645 			if (!err.among!(EAGAIN, EINPROGRESS)) {
646 				on_data_available(socket, IOStatus.error, 0);
647 				return;
648 			}
649 		}
650 
651 		size_t bytes_read = 0;
652 
653 		if (ret == 0) {
654 			on_data_available(socket, IOStatus.disconnected, 0);
655 			return;
656 		}
657 
658 		if (ret > 0) {
659 			on_data_available(socket, IOStatus.ok, 0);
660 			return;
661 		}
662 
663 		with (m_loop.m_fds[socket].streamSocket) {
664 			readCallback = on_data_available;
665 			readMode = IOMode.once;
666 			bytesRead = 0;
667 			readBuffer = null;
668 		}
669 
670 		m_loop.setNotifyCallback!(EventType.read)(socket, &onSocketDataAvailable);
671 	}
672 
673 	private void onSocketDataAvailable(FD fd)
674 	{
675 		auto slot = () @trusted { return &m_loop.m_fds[fd].streamSocket(); } ();
676 		auto socket = cast(StreamSocketFD)fd;
677 
678 		void finalize()(IOStatus status)
679 		{
680 			auto l = lockHandle(socket);
681 			m_loop.setNotifyCallback!(EventType.read)(socket, null);
682 			//m_fds[fd].readBuffer = null;
683 			slot.readCallback(socket, status, 0);
684 		}
685 
686 		sizediff_t ret;
687 		ubyte tmp;
688 		() @trusted { ret = recv(cast(sock_t)socket, &tmp, 1, MSG_PEEK); } ();
689 		if (ret < 0) {
690 			auto err = getSocketError();
691 			if (!err.among!(EAGAIN, EINPROGRESS)) finalize(IOStatus.error);
692 		} else finalize(ret ? IOStatus.ok : IOStatus.disconnected);
693 	}
694 
695 	final override void shutdown(StreamSocketFD socket, bool shut_read, bool shut_write)
696 	{
697 		if (!isValid(socket)) return;
698 
699 		auto st = m_loop.m_fds[socket].streamSocket.state;
700 		() @trusted { .shutdown(cast(sock_t)socket, shut_read ? shut_write ? SHUT_RDWR : SHUT_RD : shut_write ? SHUT_WR : 0); } ();
701 		if (st == ConnectionState.passiveClose) shut_read = true;
702 		if (st == ConnectionState.activeClose) shut_write = true;
703 		m_loop.m_fds[socket].streamSocket.state = shut_read ? shut_write ? ConnectionState.closed : ConnectionState.passiveClose : shut_write ? ConnectionState.activeClose : ConnectionState.connected;
704 	}
705 
706 	final override DatagramSocketFD createDatagramSocket(scope Address bind_address,
707 		scope Address target_address, DatagramCreateOptions options = DatagramCreateOptions.init)
708 	{
709 		return createDatagramSocketInternal(bind_address, target_address, options, false);
710 	}
711 
712 	package DatagramSocketFD createDatagramSocketInternal(scope Address bind_address,
713 		scope Address target_address, DatagramCreateOptions options = DatagramCreateOptions.init,
714 		bool is_internal = true)
715 	{
716 		// @trusted to escape DIP1000's `scope` check
717 		auto sockfd = () @trusted { return createSocket(bind_address.addressFamily, SOCK_DGRAM); }();
718 		if (sockfd == -1) return DatagramSocketFD.invalid;
719 
720 		auto optsucc = () @trusted {
721 			int tmp_reuse = 1;
722 			// FIXME: error handling!
723 			if (options & DatagramCreateOptions.reuseAddress) {
724 				if (setsockopt(sockfd, SOL_SOCKET, SO_REUSEADDR, &tmp_reuse, tmp_reuse.sizeof) != 0)
725 					return false;
726 			}
727 
728 			version (Windows) {}
729 			else {
730 				if ((options & DatagramCreateOptions.reusePort) && setsockopt(sockfd, SOL_SOCKET, SO_REUSEPORT, &tmp_reuse, tmp_reuse.sizeof) != 0)
731 					return false;
732 			}
733 
734 			return true;
735 		} ();
736 
737 		if (!optsucc) {
738 			closeSocket(sockfd);
739 			return DatagramSocketFD.init;
740 		}
741 
742 
743 		if (bind_address && () @trusted { return bind(sockfd, bind_address.name, bind_address.nameLen); } () != 0) {
744 			closeSocket(sockfd);
745 			return DatagramSocketFD.init;
746 		}
747 
748 		if (target_address) {
749 			int ret;
750 			if (target_address is bind_address) {
751 				// special case of bind_address==target_address: determine the actual bind address
752 				// in case of a zero port
753 				sockaddr_storage sa;
754 				socklen_t addr_len = sa.sizeof;
755 				if (() @trusted { return getsockname(sockfd, cast(sockaddr*)&sa, &addr_len); } () != 0) {
756 					closeSocket(sockfd);
757 					return DatagramSocketFD.init;
758 				}
759 
760 				ret = () @trusted { return connect(sockfd, cast(sockaddr*)&sa, addr_len); } ();
761 			} else ret = () @trusted { return connect(sockfd, target_address.name, target_address.nameLen); } ();
762 
763 			if (ret != 0) {
764 				closeSocket(sockfd);
765 				return DatagramSocketFD.init;
766 			}
767 		}
768 
769 		auto flags = is_internal ? FDFlags.internal : FDFlags.none;
770 		auto sock = m_loop.initFD!DatagramSocketFD(sockfd, flags, DgramSocketSlot.init);
771 		m_loop.registerFD(sock, EventMask.read|EventMask.write);
772 		return sock;
773 	}
774 
775 	final override DatagramSocketFD adoptDatagramSocket(int socket)
776 	{
777 		return adoptDatagramSocketInternal(socket, false);
778 	}
779 
780 	package DatagramSocketFD adoptDatagramSocketInternal(int socket, bool is_internal = true, bool close_on_exec = false)
781 	@nogc {
782 		if (m_loop.m_fds[socket].common.refCount) // FD already in use?
783 			return DatagramSocketFD.init;
784 		setSocketNonBlocking(socket, close_on_exec);
785 		auto flags = is_internal ? FDFlags.internal : FDFlags.none;
786 		auto fd = m_loop.initFD!DatagramSocketFD(socket, flags, DgramSocketSlot.init);
787 		m_loop.registerFD(fd, EventMask.read|EventMask.write);
788 		return fd;
789 	}
790 
791 	final override void setTargetAddress(DatagramSocketFD socket, scope Address target_address)
792 	{
793 		if (!isValid(socket)) return;
794 
795 		() @trusted { connect(cast(sock_t)socket, target_address.name, target_address.nameLen); } ();
796 	}
797 
798 	final override bool setBroadcast(DatagramSocketFD socket, bool enable)
799 	{
800 		if (!isValid(socket)) return false;
801 
802 		int tmp_broad = enable;
803 		return () @trusted { return setsockopt(cast(sock_t)socket, SOL_SOCKET, SO_BROADCAST, &tmp_broad, tmp_broad.sizeof); } () == 0;
804 	}
805 
806 	final override bool joinMulticastGroup(DatagramSocketFD socket, scope Address multicast_address, uint interface_index = 0)
807 	{
808 		if (!isValid(socket)) return false;
809 
810 		// @trusted to escape DIP1000's `scope` check
811 		switch (() @trusted { return multicast_address.addressFamily; }()) {
812 			default: assert(false, "Multicast only supported for IPv4/IPv6 sockets.");
813 			case AddressFamily.INET:
814 				struct ip_mreq {
815 					in_addr imr_multiaddr;   /* IP multicast address of group */
816 					in_addr imr_interface;   /* local IP address of interface */
817 				}
818 				auto addr = () @trusted { return cast(sockaddr_in*)multicast_address.name; } ();
819 				ip_mreq mreq;
820 				mreq.imr_multiaddr = addr.sin_addr;
821 				mreq.imr_interface.s_addr = htonl(interface_index);
822 				return () @trusted { return setsockopt(cast(sock_t)socket, IPPROTO_IP, IP_ADD_MEMBERSHIP, &mreq, ip_mreq.sizeof); } () == 0;
823 			case AddressFamily.INET6:
824 				version (Windows) {
825 					struct ipv6_mreq {
826 						in6_addr ipv6mr_multiaddr;
827 						uint ipv6mr_interface;
828 					}
829 				}
830 				auto addr = () @trusted { return cast(sockaddr_in6*)multicast_address.name; } ();
831 				ipv6_mreq mreq;
832 				mreq.ipv6mr_multiaddr = addr.sin6_addr;
833 
834 				version (Android) {
835 					// ipv6mr_interface is defined as ipv6mr_ifindex on android
836 					mreq.ipv6mr_ifindex = htonl(interface_index);
837 				} else {
838 					mreq.ipv6mr_interface = htonl(interface_index);
839 				}
840 				return () @trusted { return setsockopt(cast(sock_t)socket, IPPROTO_IP, IPV6_JOIN_GROUP, &mreq, ipv6_mreq.sizeof); } () == 0;
841 		}
842 	}
843 
844 	void receive(DatagramSocketFD socket, ubyte[] buffer, IOMode mode, DatagramIOCallback on_receive_finish)
845 	@safe {
846 		assert(mode != IOMode.all, "Only IOMode.immediate and IOMode.once allowed for datagram sockets.");
847 
848 		if (!isValid(socket)) {
849 			RefAddress addr;
850 			on_receive_finish(socket, IOStatus.invalidHandle, 0, addr);
851 			return;
852 		}
853 
854 		sizediff_t ret;
855 		sockaddr_storage src_addr;
856 		socklen_t src_addr_len = src_addr.sizeof;
857 		() @trusted { ret = .recvfrom(cast(sock_t)socket, buffer.ptr, min(buffer.length, int.max), 0, cast(sockaddr*)&src_addr, &src_addr_len); } ();
858 
859 		if (ret < 0) {
860 			auto err = getSocketError();
861 			if (!err.among!(EAGAIN, EINPROGRESS)) {
862 				print("sock error %s for %s!", err, socket);
863 				on_receive_finish(socket, IOStatus.error, 0, null);
864 				return;
865 			}
866 
867 			if (mode == IOMode.immediate) {
868 				on_receive_finish(socket, IOStatus.wouldBlock, 0, null);
869 			} else {
870 				with (m_loop.m_fds[socket].datagramSocket) {
871 					readCallback = on_receive_finish;
872 					readMode = mode;
873 					bytesRead = 0;
874 					readBuffer = buffer;
875 				}
876 
877 				m_loop.setNotifyCallback!(EventType.read)(socket, &onDgramRead);
878 			}
879 			return;
880 		}
881 
882 		scope src_addrc = new RefAddress(() @trusted { return cast(sockaddr*)&src_addr; } (), src_addr_len);
883 		if (ret == 0) {
884 			on_receive_finish(socket, IOStatus.disconnected, 0, src_addrc);
885 		} else {
886 			on_receive_finish(socket, IOStatus.ok, ret, src_addrc);
887 		}
888 	}
889 
890 	package void receiveNoGC(DatagramSocketFD socket, ubyte[] buffer, IOMode mode, void delegate(DatagramSocketFD, IOStatus, size_t, scope RefAddress) @safe nothrow @nogc on_receive_finish)
891 	@trusted @nogc {
892 		scope void delegate() @safe nothrow do_it = {
893 			receive(socket, buffer, mode, on_receive_finish);
894 		};
895 		(cast(void delegate() @safe nothrow @nogc)do_it)();
896 	}
897 
898 	void cancelReceive(DatagramSocketFD socket)
899 	@nogc {
900 		if (!isValid(socket)) return;
901 
902 		auto slot = () @trusted { return &m_loop.m_fds[socket].datagramSocket(); } ();
903 		if (slot.readCallback is null) return;
904 
905 		m_loop.setNotifyCallback!(EventType.read)(socket, null);
906 		slot.readCallback = null;
907 		slot.readBuffer = null;
908 	}
909 
910 	private void onDgramRead(FD fd)
911 	@safe {
912 		auto slot = () @trusted { return &m_loop.m_fds[fd].datagramSocket(); } ();
913 		auto socket = cast(DatagramSocketFD)fd;
914 
915 		sizediff_t ret;
916 		sockaddr_storage src_addr;
917 		socklen_t src_addr_len = src_addr.sizeof;
918 		() @trusted { ret = .recvfrom(cast(sock_t)socket, slot.readBuffer.ptr, min(slot.readBuffer.length, int.max), 0, cast(sockaddr*)&src_addr, &src_addr_len); } ();
919 
920 		if (ret < 0) {
921 			auto err = getSocketError();
922 			if (!err.among!(EAGAIN, EINPROGRESS)) {
923 				auto l = lockHandle(socket);
924 				m_loop.setNotifyCallback!(EventType.read)(socket, null);
925 				slot.readCallback(socket, IOStatus.error, 0, null);
926 				return;
927 			}
928 		}
929 
930 		auto l = lockHandle(socket);
931 		m_loop.setNotifyCallback!(EventType.read)(socket, null);
932 		scope src_addrc = new RefAddress(() @trusted { return cast(sockaddr*)&src_addr; } (), src_addr.sizeof);
933 		auto cb = () @trusted { return cast(DatagramIOCallback)slot.readCallback; } ();
934 		slot.readCallback = null;
935 		cb(socket, IOStatus.ok, ret, src_addrc);
936 	}
937 
938 	void send(DatagramSocketFD socket, const(ubyte)[] buffer, IOMode mode, Address target_address, DatagramIOCallback on_send_finish)
939 	{
940 		assert(mode != IOMode.all, "Only IOMode.immediate and IOMode.once allowed for datagram sockets.");
941 
942 		if (!isValid(socket)) {
943 			RefAddress addr;
944 			on_send_finish(socket, IOStatus.invalidHandle, 0, addr);
945 			return;
946 		}
947 
948 		sizediff_t ret;
949 		if (target_address) {
950 			() @trusted { ret = .sendto(cast(sock_t)socket, buffer.ptr, min(buffer.length, int.max), SEND_FLAGS, target_address.name, target_address.nameLen); } ();
951 			m_loop.m_fds[socket].datagramSocket.targetAddr = target_address;
952 		} else {
953 			() @trusted { ret = .send(cast(sock_t)socket, buffer.ptr, min(buffer.length, int.max), SEND_FLAGS); } ();
954 		}
955 
956 		if (ret < 0) {
957 			auto err = getSocketError();
958 			if (!err.among!(EAGAIN, EINPROGRESS)) {
959 				print("sock error %s!", err);
960 				on_send_finish(socket, IOStatus.error, 0, null);
961 				return;
962 			}
963 
964 			if (mode == IOMode.immediate) {
965 				on_send_finish(socket, IOStatus.wouldBlock, 0, null);
966 			} else {
967 				with (m_loop.m_fds[socket].datagramSocket) {
968 					writeCallback = on_send_finish;
969 					writeMode = mode;
970 					bytesWritten = 0;
971 					writeBuffer = buffer;
972 				}
973 
974 				m_loop.setNotifyCallback!(EventType.write)(socket, &onDgramWrite);
975 			}
976 			return;
977 		}
978 
979 		on_send_finish(socket, IOStatus.ok, ret, null);
980 	}
981 
982 	void cancelSend(DatagramSocketFD socket)
983 	{
984 		if (!isValid(socket)) return;
985 
986 		assert(m_loop.m_fds[socket].datagramSocket.writeCallback !is null, "Cancelling write when there is no write in progress.");
987 		m_loop.setNotifyCallback!(EventType.write)(socket, null);
988 		m_loop.m_fds[socket].datagramSocket.writeBuffer = null;
989 	}
990 
991 	private void onDgramWrite(FD fd)
992 	{
993 		auto slot = () @trusted { return &m_loop.m_fds[fd].datagramSocket(); } ();
994 		auto socket = cast(DatagramSocketFD)fd;
995 
996 		sizediff_t ret;
997 		if (slot.targetAddr) {
998 			() @trusted { ret = .sendto(cast(sock_t)socket, slot.writeBuffer.ptr, min(slot.writeBuffer.length, int.max), SEND_FLAGS, slot.targetAddr.name, slot.targetAddr.nameLen); } ();
999 		} else {
1000 			() @trusted { ret = .send(cast(sock_t)socket, slot.writeBuffer.ptr, min(slot.writeBuffer.length, int.max), SEND_FLAGS); } ();
1001 		}
1002 
1003 		if (ret < 0) {
1004 			auto err = getSocketError();
1005 			if (!err.among!(EAGAIN, EINPROGRESS)) {
1006 				auto l = lockHandle(socket);
1007 				m_loop.setNotifyCallback!(EventType.write)(socket, null);
1008 				() @trusted { return cast(DatagramIOCallback)slot.writeCallback; } ()(socket, IOStatus.error, 0, null);
1009 				return;
1010 			}
1011 		}
1012 
1013 		auto l = lockHandle(socket);
1014 		m_loop.setNotifyCallback!(EventType.write)(socket, null);
1015 		() @trusted { return cast(DatagramIOCallback)slot.writeCallback; } ()(socket, IOStatus.ok, ret, null);
1016 	}
1017 
1018 	final override bool isValid(SocketFD handle)
1019 	const {
1020 		if (handle.value > m_loop.m_fds.length) return false;
1021 		return m_loop.m_fds[handle.value].common.validationCounter == handle.validationCounter;
1022 	}
1023 
1024 	final override void addRef(SocketFD fd)
1025 	{
1026 		if (!isValid(fd)) return;
1027 
1028 		auto slot = () @trusted { return &m_loop.m_fds[fd]; } ();
1029 		assert(slot.common.refCount > 0, "Adding reference to unreferenced socket FD.");
1030 		slot.common.refCount++;
1031 	}
1032 
1033 	final override bool releaseRef(SocketFD fd)
1034 	@nogc {
1035 		import taggedalgebraic : hasType;
1036 
1037 		if (!isValid(fd)) return true;
1038 
1039 		auto slot = () @trusted { return &m_loop.m_fds[fd]; } ();
1040 		nogc_assert(slot.common.refCount > 0, "Releasing reference to unreferenced socket FD.");
1041 		// listening sockets have an incremented the reference count because of setNotifyCallback
1042 		int base_refcount = slot.specific.hasType!StreamListenSocketSlot ? 1 : 0;
1043 		if (--slot.common.refCount == base_refcount) {
1044 			m_loop.unregisterFD(fd, EventMask.read|EventMask.write);
1045 			switch (slot.specific.kind) with (slot.specific.Kind) {
1046 				default: assert(false, "File descriptor slot is not a socket.");
1047 				case streamSocket:
1048 					m_loop.clearFD!StreamSocketSlot(fd);
1049 					break;
1050 				case streamListen:
1051 					m_loop.setNotifyCallback!(EventType.read)(fd, null);
1052 					m_loop.clearFD!StreamListenSocketSlot(fd);
1053 					break;
1054 				case datagramSocket:
1055 					m_loop.clearFD!DgramSocketSlot(fd);
1056 					break;
1057 			}
1058 			closeSocket(cast(sock_t)fd);
1059 			return false;
1060 		}
1061 		return true;
1062 	}
1063 
1064 	final override bool setOption(DatagramSocketFD socket, DatagramSocketOption option, bool enable)
1065 	{
1066 		if (!isValid(socket)) return false;
1067 
1068 		int proto, opt;
1069 		final switch (option) {
1070 			case DatagramSocketOption.broadcast: proto = SOL_SOCKET; opt = SO_BROADCAST; break;
1071 			case DatagramSocketOption.multicastLoopback: proto = IPPROTO_IP; opt = IP_MULTICAST_LOOP; break;
1072 		}
1073 		int tmp = enable;
1074 		return () @trusted { return setsockopt(cast(sock_t)socket, proto, opt, &tmp, tmp.sizeof); } () == 0;
1075 	}
1076 
1077 	final override bool setOption(StreamSocketFD socket, StreamSocketOption option, bool enable)
1078 	{
1079 		if (!isValid(socket)) return false;
1080 
1081 		int proto, opt;
1082 		final switch (option) {
1083 			case StreamSocketOption.noDelay: proto = IPPROTO_TCP; opt = TCP_NODELAY; break;
1084 			case StreamSocketOption.keepAlive: proto = SOL_SOCKET; opt = SO_KEEPALIVE; break;
1085 		}
1086 		int tmp = enable;
1087 		return () @trusted { return setsockopt(cast(sock_t)socket, proto, opt, &tmp, tmp.sizeof); } () == 0;
1088 	}
1089 
1090 	final protected override void* rawUserData(StreamSocketFD descriptor, size_t size, DataInitializer initialize, DataInitializer destroy)
1091 	@system {
1092 		if (!isValid(descriptor)) return null;
1093 		return m_loop.rawUserDataImpl(descriptor, size, initialize, destroy);
1094 	}
1095 
1096 	final protected override void* rawUserData(StreamListenSocketFD descriptor, size_t size, DataInitializer initialize, DataInitializer destroy)
1097 	@system {
1098 		if (!isValid(descriptor)) return null;
1099 		return m_loop.rawUserDataImpl(descriptor, size, initialize, destroy);
1100 	}
1101 
1102 	final protected override void* rawUserData(DatagramSocketFD descriptor, size_t size, DataInitializer initialize, DataInitializer destroy)
1103 	@system {
1104 		if (!isValid(descriptor)) return null;
1105 		return m_loop.rawUserDataImpl(descriptor, size, initialize, destroy);
1106 	}
1107 
1108 	private sock_t createSocket(AddressFamily family, int type)
1109 	{
1110 		sock_t sock;
1111 		version (linux) {
1112 			() @trusted { sock = socket(family, type | SOCK_NONBLOCK | SOCK_CLOEXEC, 0); } ();
1113 			if (sock == -1) return -1;
1114 		} else {
1115 			() @trusted { sock = socket(family, type, 0); } ();
1116 			if (sock == -1) return -1;
1117 			setSocketNonBlocking(sock, true);
1118 
1119 			// Prevent SIGPIPE on failed send
1120 			version (OSX) {
1121 				int val = 1;
1122 				() @trusted { setsockopt(sock, SOL_SOCKET, SO_NOSIGPIPE, &val, val.sizeof); } ();
1123 			}
1124 		}
1125 		return sock;
1126 	}
1127 
1128 	// keeps a scoped reference to a handle to avoid it getting destroyed
1129 	private auto lockHandle(H)(H handle)
1130 	{
1131 		addRef(handle);
1132 		static struct R {
1133 			PosixEventDriverSockets drv;
1134 			H handle;
1135 			@disable this(this);
1136 			~this() { drv.releaseRef(handle); }
1137 		}
1138 		return R(this, handle);
1139 	}
1140 }
1141 
1142 package struct StreamSocketSlot {
1143 	alias Handle = StreamSocketFD;
1144 
1145 	size_t bytesRead;
1146 	ubyte[] readBuffer;
1147 	IOMode readMode;
1148 	IOCallback readCallback; // FIXME: this type only works for stream sockets
1149 
1150 	size_t bytesWritten;
1151 	const(ubyte)[] writeBuffer;
1152 	IOMode writeMode;
1153 	IOCallback writeCallback; // FIXME: this type only works for stream sockets
1154 
1155 	ConnectCallback connectCallback;
1156 	ConnectionState state;
1157 }
1158 
1159 package struct StreamListenSocketSlot {
1160 	alias Handle = StreamListenSocketFD;
1161 
1162 	AcceptCallback acceptCallback;
1163 }
1164 
1165 package struct DgramSocketSlot {
1166 	alias Handle = DatagramSocketFD;
1167 	size_t bytesRead;
1168 	ubyte[] readBuffer;
1169 	IOMode readMode;
1170 	DatagramIOCallback readCallback; // FIXME: this type only works for stream sockets
1171 
1172 	size_t bytesWritten;
1173 	const(ubyte)[] writeBuffer;
1174 	IOMode writeMode;
1175 	DatagramIOCallback writeCallback; // FIXME: this type only works for stream sockets
1176 	Address targetAddr;
1177 }
1178 
1179 private void closeSocket(sock_t sockfd)
1180 @nogc nothrow {
1181 	version (Windows) () @trusted { closesocket(sockfd); } ();
1182 	else close(sockfd);
1183 }
1184 
1185 private void setSocketNonBlocking(SocketFD.BaseType sockfd, bool close_on_exec = false)
1186 @nogc nothrow {
1187 	version (Windows) {
1188 		uint enable = 1;
1189 		() @trusted { ioctlsocket(sockfd, FIONBIO, &enable); } ();
1190 	} else {
1191 		int f = O_NONBLOCK;
1192 		if (close_on_exec) f |= O_CLOEXEC;
1193 		() @trusted { fcntl(cast(int)sockfd, F_SETFL, f); } ();
1194 	}
1195 }
1196 
1197 private int getSocketError()
1198 @nogc nothrow {
1199 	version (Windows) return WSAGetLastError();
1200 	else return errno;
1201 }
1202 
1203 private int getBacklogSize()
1204 @trusted @nogc nothrow {
1205 	int backlog = 128;
1206 	version (linux)
1207 	{
1208 		import core.stdc.stdio : fclose, fopen, fscanf;
1209 		auto somaxconn = fopen("/proc/sys/net/core/somaxconn", "re");
1210 		if (somaxconn)
1211 		{
1212 			int tmp;
1213 			if (fscanf(somaxconn, "%d", &tmp) == 1)
1214 				backlog = tmp;
1215 			fclose(somaxconn);
1216 		}
1217 	}
1218 	return backlog;
1219 }