1 module eventcore.drivers.winapi.sockets;
2 
3 version (Windows):
4 
5 import eventcore.driver;
6 import eventcore.drivers.winapi.core;
7 import eventcore.internal.win32;
8 import eventcore.internal.utils : AlgebraicChoppedVector, print, nogc_assert;
9 import std.socket : Address;
10 
11 import core.time: Duration;
12 
13 private enum WM_USER_SOCKET = WM_USER + 1;
14 
15 
16 final class WinAPIEventDriverSockets : EventDriverSockets {
17 @safe: /*@nogc:*/ nothrow:
18 	private {
19 		alias SocketVector = AlgebraicChoppedVector!(SocketSlot, StreamSocketSlot, StreamListenSocketSlot, DatagramSocketSlot);
20 		SocketVector m_sockets;
21 		size_t m_socketCount = 0;
22 		WinAPIEventDriverCore m_core;
23 		DWORD m_tid;
24 		HWND m_hwnd;
25 		size_t m_waiters;
26 	}
27 
28 	this(WinAPIEventDriverCore core)
29 	@trusted @nogc {
30 		m_tid = GetCurrentThreadId();
31 		m_core = core;
32 
33 		// setup socket event message window
34 		setupWindowClass();
35 		m_hwnd = CreateWindowA("VibeWin32MessageWindow", "VibeWin32MessageWindow", 0, 0,0,0,0, HWND_MESSAGE,null,null,null);
36 		SetWindowLongPtrA(m_hwnd, GWLP_USERDATA, cast(ULONG_PTR)cast(void*)this);
37 		assert(cast(WinAPIEventDriverSockets)cast(void*)GetWindowLongPtrA(m_hwnd, GWLP_USERDATA) is this);
38 	}
39 
40 	package @property size_t waiterCount() const { return m_waiters; }
41 
42 	package bool checkForLeakedHandles()
43 	{
44 		if (m_socketCount == 0) return false;
45 		print("Warning: %s socket handles leaked at driver shutdown.", m_socketCount);
46 		return true;
47 	}
48 
49 	override StreamSocketFD connectStream(scope Address peer_address, scope Address bind_address, ConnectCallback on_connect)
50 	@trusted {
51 		assert(m_tid == GetCurrentThreadId());
52 
53 		auto fd = WSASocketW(peer_address.addressFamily, SOCK_STREAM, IPPROTO_TCP, null, 0, WSA_FLAG_OVERLAPPED);
54 		if (fd == INVALID_SOCKET) {
55 			on_connect(StreamSocketFD.invalid, ConnectStatus.socketCreateFailure);
56 			return StreamSocketFD.invalid;
57 		}
58 
59 		void invalidateSocket() @nogc @trusted nothrow { closesocket(fd); fd = INVALID_SOCKET; }
60 
61 		int bret;
62 		if (bind_address !is null)
63 			bret = bind(fd, bind_address.name, bind_address.nameLen);
64 		if (bret != 0) {
65 			invalidateSocket();
66 			on_connect(StreamSocketFD.invalid, ConnectStatus.bindFailure);
67 			return StreamSocketFD.invalid;
68 		}
69 
70 		auto sock = adoptStreamInternal(fd, ConnectionState.connecting);
71 
72 		auto ret = .connect(fd, peer_address.name, peer_address.nameLen);
73 		//auto ret = WSAConnect(m_socket, peer_address.name, peer_address.nameLen, null, null, null, null);
74 
75 		if (ret == 0) {
76 			m_sockets[sock].specific.state = ConnectionState.connected;
77 			on_connect(sock, ConnectStatus.connected);
78 			return sock;
79 		}
80 
81 		auto err = WSAGetLastError();
82 		if (err == WSAEWOULDBLOCK) {
83 			with (m_sockets[sock].streamSocket) {
84 				connectCallback = on_connect;
85 			}
86 
87 			m_core.addWaiter();
88 			return sock;
89 		} else {
90 			clearSocketSlot(sock);
91 			invalidateSocket();
92 			on_connect(StreamSocketFD.invalid, ConnectStatus.unknownError);
93 			return StreamSocketFD.invalid;
94 		}
95 	}
96 
97 	final override void cancelConnectStream(StreamSocketFD sock)
98 	{
99 		if (!isValid(sock)) return;
100 
101 		with (m_sockets[sock].streamSocket) {
102 			assert(state == ConnectionState.connecting,
103 				"Must be in 'connecting' state when calling cancelConnection.");
104 
105 			state = ConnectionState.closed;
106 			connectCallback = null;
107 			m_core.removeWaiter();
108 		}
109 	}
110 
111 	override StreamSocketFD adoptStream(int socket)
112 	{
113 		return adoptStreamInternal(socket, ConnectionState.connected);
114 	}
115 
116 	private StreamSocketFD adoptStreamInternal(SOCKET socket, ConnectionState state)
117 	{
118 		if (m_sockets[socket].common.refCount) // FD already in use?
119 			return StreamSocketFD.invalid;
120 
121 		// done by wsaasyncselect
122 		//uint enable = 1;
123 		//() @trusted { ioctlsocket(socket, FIONBIO, &enable); } ();
124 
125 		void setupOverlapped(ref OVERLAPPED_CORE overlapped) @trusted @nogc nothrow {
126 			overlapped.Internal = 0;
127 			overlapped.InternalHigh = 0;
128 			overlapped.Offset = 0;
129 			overlapped.OffsetHigh = 0;
130 			overlapped.hEvent = cast(HANDLE)cast(void*)&m_sockets[socket];
131 			overlapped.driver = m_core;
132 		}
133 
134 		auto fd = initSocketSlot!StreamSocketFD(socket);
135 		with (m_sockets[socket]) {
136 			specific = StreamSocketSlot.init;
137 			streamSocket.state = state;
138 			setupOverlapped(streamSocket.write.overlapped);
139 			setupOverlapped(streamSocket.read.overlapped);
140 		}
141 
142 		() @trusted { WSAAsyncSelect(socket, m_hwnd, WM_USER_SOCKET, FD_CONNECT|FD_CLOSE); } ();
143 
144 		return fd;
145 	}
146 
147 	alias listenStream = EventDriverSockets.listenStream;
148 	override StreamListenSocketFD listenStream(scope Address bind_address, StreamListenOptions options, AcceptCallback on_accept)
149 	{
150 		auto fd = () @trusted { return WSASocketW(bind_address.addressFamily, SOCK_STREAM, IPPROTO_TCP, null, 0, WSA_FLAG_OVERLAPPED); } ();
151 		if (fd == INVALID_SOCKET)
152 			return StreamListenSocketFD.invalid;
153 
154 		void invalidateSocket() @nogc @trusted nothrow { closesocket(fd); fd = INVALID_SOCKET; }
155 
156 		() @trusted {
157 			if (options & StreamListenOptions.reuseAddress) {
158 				int tmp_reuse = 1;
159 				if (setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &tmp_reuse, tmp_reuse.sizeof) != 0) {
160 					invalidateSocket();
161 					return;
162 				}
163 			}
164 
165 			// FIXME: should SO_EXCLUSIVEADDRUSE be used of StreamListenOptions.reuseAddress isn't set?
166 
167 			if (bind(fd, bind_address.name, bind_address.nameLen) != 0) {
168 				invalidateSocket();
169 				return;
170 			}
171 			if (listen(fd, 128) != 0) {
172 				invalidateSocket();
173 				return;
174 			}
175 		} ();
176 
177 		if (fd == INVALID_SOCKET)
178 			return StreamListenSocketFD.invalid;
179 
180 		auto sock = initSocketSlot!StreamListenSocketFD(fd);
181 		m_sockets[sock].specific = StreamListenSocketSlot.init;
182 
183 		if (on_accept) waitForConnections(sock, on_accept);
184 
185 		return sock;
186 	}
187 
188 	override void waitForConnections(StreamListenSocketFD sock, AcceptCallback on_accept)
189 	{
190 		if (!isValid(sock)) return;
191 
192 		assert(!m_sockets[sock].streamListen.acceptCallback);
193 		m_sockets[sock].streamListen.acceptCallback = on_accept;
194 		() @trusted { WSAAsyncSelect(sock, m_hwnd, WM_USER_SOCKET, FD_ACCEPT); } ();
195 		m_core.addWaiter();
196 	}
197 
198 	override ConnectionState getConnectionState(StreamSocketFD sock)
199 	{
200 		if (!isValid(sock)) return ConnectionState.closed;
201 
202 		return m_sockets[sock].streamSocket.state;
203 	}
204 
205 	override bool getLocalAddress(SocketFD sock, scope RefAddress dst)
206 	{
207 		if (!isValid(sock)) return false;
208 
209 		socklen_t addr_len = dst.nameLen;
210 		if (() @trusted { return getsockname(sock, dst.name, &addr_len); } () != 0)
211 			return false;
212 		dst.cap(addr_len);
213 		return true;
214 	}
215 
216 	override bool getRemoteAddress(SocketFD sock, scope RefAddress dst)
217 	{
218 		if (!isValid(sock)) return false;
219 
220 		socklen_t addr_len = dst.nameLen;
221 		if (() @trusted { return getpeername(sock, dst.name, &addr_len); } () != 0)
222 			return false;
223 		dst.cap(addr_len);
224 		return true;
225 	}
226 
227 	override void setTCPNoDelay(StreamSocketFD socket, bool enable)
228 	@trusted {
229 		if (!isValid(socket)) return;
230 
231 		BOOL eni = enable;
232 		setsockopt(socket, IPPROTO_TCP, TCP_NODELAY, &eni, eni.sizeof);
233 	}
234 
235 	override void setKeepAlive(StreamSocketFD socket, bool enable)
236 	@trusted {
237 		if (!isValid(socket)) return;
238 
239 		BOOL eni = enable;
240 		setsockopt(socket, SOL_SOCKET, SO_KEEPALIVE, &eni, eni.sizeof);
241 	}
242 
243 	override void setKeepAliveParams(StreamSocketFD socket, Duration idle, Duration interval, int probeCount) @trusted
244 	{
245 		if (!isValid(socket)) return;
246 
247 		tcp_keepalive opts = tcp_keepalive(1, cast(c_ulong) idle.total!"msecs"(),
248 			cast(c_ulong) interval.total!"msecs");
249 		int result = WSAIoctl(socket, SIO_KEEPALIVE_VALS, &opts, cast(DWORD) tcp_keepalive.sizeof,
250 			null, 0, null, null, null);
251 		if (result != 0)
252 			print("WSAIoctl error on SIO_KEEPALIVE_VALS: %d", WSAGetLastError());
253 	}
254 
255 	override void setUserTimeout(StreamSocketFD socket, Duration timeout) {}
256 
257 
258 	override void read(StreamSocketFD socket, ubyte[] buffer, IOMode mode, IOCallback on_read_finish)
259 	{
260 		if (!isValid(socket)) {
261 			on_read_finish(socket, IOStatus.invalidHandle, 0);
262 			return;
263 		}
264 
265 		auto slot = () @trusted { return &m_sockets[socket].streamSocket(); } ();
266 		slot.read.buffer = buffer;
267 		slot.read.bytesTransferred = 0;
268 		slot.read.mode = mode;
269 		slot.read.wsabuf[0].len = buffer.length;
270 		slot.read.wsabuf[0].buf = () @trusted { return buffer.ptr; } ();
271 
272 		void resetBuffers() { slot.read.buffer = null; slot.read.wsabuf[0] = WSABUF.init; }
273 
274 		auto ovl = mode == IOMode.immediate ? null : &slot.read.overlapped.overlapped;
275 		DWORD flags = 0;
276 		auto handler = &overlappedIOHandler!(onIOReadCompleted, DWORD);
277 		auto ret = () @trusted { return WSARecv(socket, &slot.read.wsabuf[0], slot.read.wsabuf.length, null, &flags, ovl, handler); } ();
278 		if (ret == SOCKET_ERROR) {
279 			auto err = WSAGetLastError();
280 			if (err == WSA_IO_PENDING) {
281 				if (mode == IOMode.immediate) {
282 					resetBuffers();
283 					on_read_finish(socket, IOStatus.wouldBlock, 0);
284 					return;
285 				}
286 			} else if (err == WSAEWOULDBLOCK && mode == IOMode.immediate) {
287 				resetBuffers();
288 				on_read_finish(socket, IOStatus.wouldBlock, 0);
289 				return;
290 			} else {
291 				resetBuffers();
292 				auto st = handleReadError(err, *slot);
293 				on_read_finish(socket, st, 0);
294 				return;
295 			}
296 		}
297 		slot.read.callback = on_read_finish;
298 		addRef(socket);
299 		m_core.addWaiter();
300 	}
301 
302 	private static nothrow
303 	void onIOReadCompleted(DWORD dwError, DWORD cbTransferred, OVERLAPPED_CORE* lpOverlapped)
304 	{
305 		auto slot = () @trusted { return cast(SocketVector.FullField*)lpOverlapped.hEvent; } ();
306 
307 		if (!slot.streamSocket.read.callback) return;
308 
309 		void invokeCallback(IOStatus status, size_t nsent)
310 		@safe nothrow {
311 			slot.common.core.removeWaiter();
312 			auto cb = slot.streamSocket.read.callback;
313 			slot.streamSocket.read.callback = null;
314 			slot.streamSocket.read.buffer = null;
315 			slot.streamSocket.read.wsabuf[0] = WSABUF.init;
316 			if (slot.common.driver.releaseRef(cast(StreamSocketFD)slot.common.fd))
317 				cb(cast(StreamSocketFD)slot.common.fd, status, nsent);
318 		}
319 
320 		slot.streamSocket.read.bytesTransferred += cbTransferred;
321 		slot.streamSocket.read.buffer = slot.streamSocket.read.buffer[cbTransferred .. $];
322 
323 		if (dwError) {
324 			auto st = handleReadError(dwError, slot.streamSocket);
325 			invokeCallback(st, slot.streamSocket.read.bytesTransferred);
326 			return;
327 		}
328 
329 		if (!cbTransferred) {
330 			handleReadError(WSAEDISCON, slot.streamSocket);
331 			invokeCallback(IOStatus.disconnected, slot.streamSocket.read.bytesTransferred);
332 			return;
333 		}
334 
335 		if (slot.streamSocket.read.mode == IOMode.once || !slot.streamSocket.read.buffer.length) {
336 			invokeCallback(IOStatus.ok, slot.streamSocket.read.bytesTransferred);
337 			return;
338 		}
339 
340 		slot.streamSocket.read.wsabuf[0].len = slot.streamSocket.read.buffer.length;
341 		slot.streamSocket.read.wsabuf[0].buf = () @trusted { return cast(ubyte*)slot.streamSocket.read.buffer.ptr; } ();
342 		auto ovl = slot.streamSocket.read.mode == IOMode.immediate ? null : &slot.streamSocket.read.overlapped.overlapped;
343 		DWORD flags = 0;
344 		auto handler = &overlappedIOHandler!(onIOReadCompleted, DWORD);
345 		auto ret = () @trusted { return WSARecv(slot.common.fd, &slot.streamSocket.read.wsabuf[0], slot.streamSocket.read.wsabuf.length, null, &flags, ovl, handler); } ();
346 		if (ret == SOCKET_ERROR) {
347 			auto err = WSAGetLastError();
348 			if (err == WSA_IO_PENDING) {
349 				if (slot.streamSocket.read.mode == IOMode.immediate) {
350 					invokeCallback(IOStatus.wouldBlock, 0);
351 				}
352 			} else {
353 				auto st = handleReadError(err, slot.streamSocket);
354 				invokeCallback(st, slot.streamSocket.read.bytesTransferred);
355 			}
356 		}
357 	}
358 
359 	private static IOStatus handleReadError(DWORD err, ref StreamSocketSlot slot)
360 	@safe nothrow {
361 		switch (err) {
362 			case 0: return IOStatus.ok;
363 			case WSAEDISCON, WSAESHUTDOWN:
364 				if (slot.state == ConnectionState.activeClose)
365 					slot.state = ConnectionState.closed;
366 				else if (slot.state != ConnectionState.closed)
367 					slot.state = ConnectionState.passiveClose;
368 				return IOStatus.disconnected;
369 			case WSAECONNABORTED, WSAECONNRESET, WSAENETRESET, WSAETIMEDOUT:
370 				slot.state = ConnectionState.closed;
371 				return IOStatus.disconnected;
372 			default: return IOStatus.error;
373 		}
374 	}
375 
376 
377 	override void write(StreamSocketFD socket, const(ubyte)[] buffer, IOMode mode, IOCallback on_write_finish)
378 	{
379 		if (!isValid(socket)) {
380 			on_write_finish(socket, IOStatus.invalidHandle, 0);
381 			return;
382 		}
383 
384 		auto slot = () @trusted { return &m_sockets[socket].streamSocket(); } ();
385 		slot.write.buffer = buffer;
386 		slot.write.bytesTransferred = 0;
387 		slot.write.mode = mode;
388 		slot.write.wsabuf[0].len = buffer.length;
389 		slot.write.wsabuf[0].buf = () @trusted { return cast(ubyte*)buffer.ptr; } ();
390 
391 		auto ovl = mode == IOMode.immediate ? null : &m_sockets[socket].streamSocket.write.overlapped.overlapped;
392 		auto handler = &overlappedIOHandler!(onIOWriteCompleted, DWORD);
393 		auto ret = () @trusted { return WSASend(socket, &slot.write.wsabuf[0], slot.write.wsabuf.length, null, 0, ovl, handler); } ();
394 		if (ret == SOCKET_ERROR) {
395 			auto err = WSAGetLastError();
396 			if (err == WSA_IO_PENDING) {
397 				if (mode == IOMode.immediate) {
398 					on_write_finish(socket, IOStatus.wouldBlock, 0);
399 					return;
400 				}
401 			} else {
402 				auto st = handleWriteError(err, *slot);
403 				on_write_finish(socket, st, 0);
404 				return;
405 			}
406 		}
407 		m_sockets[socket].streamSocket.write.callback = on_write_finish;
408 		addRef(socket);
409 		m_core.addWaiter();
410 	}
411 
412 	private static nothrow
413 	void onIOWriteCompleted(DWORD dwError, DWORD cbTransferred, OVERLAPPED_CORE* lpOverlapped)
414 	{
415 		auto slot = () @trusted { return cast(SocketVector.FullField*)lpOverlapped.hEvent; } ();
416 
417 		if (!slot.streamSocket.write.callback) return;
418 
419 		void invokeCallback(IOStatus status, size_t nsent)
420 		@safe nothrow {
421 			slot.common.core.removeWaiter();
422 			auto cb = slot.streamSocket.write.callback;
423 			slot.streamSocket.write.callback = null;
424 			if (slot.common.driver.releaseRef(cast(StreamSocketFD)slot.common.fd))
425 				cb(cast(StreamSocketFD)slot.common.fd, status, nsent);
426 		}
427 
428 		slot.streamSocket.write.bytesTransferred += cbTransferred;
429 		slot.streamSocket.write.buffer = slot.streamSocket.write.buffer[cbTransferred .. $];
430 
431 		if (dwError) {
432 			auto st = handleWriteError(dwError, slot.streamSocket);
433 			invokeCallback(st, slot.streamSocket.write.bytesTransferred);
434 			return;
435 		}
436 
437 		if (slot.streamSocket.write.mode == IOMode.once || !slot.streamSocket.write.buffer.length) {
438 			invokeCallback(IOStatus.ok, slot.streamSocket.write.bytesTransferred);
439 			return;
440 		}
441 
442 		slot.streamSocket.write.wsabuf[0].len = slot.streamSocket.write.buffer.length;
443 		slot.streamSocket.write.wsabuf[0].buf = () @trusted { return cast(ubyte*)slot.streamSocket.write.buffer.ptr; } ();
444 		auto ovl = slot.streamSocket.write.mode == IOMode.immediate ? null : &slot.streamSocket.write.overlapped.overlapped;
445 		auto handler = &overlappedIOHandler!(onIOWriteCompleted, DWORD);
446 		auto ret = () @trusted { return WSASend(slot.common.fd, &slot.streamSocket.write.wsabuf[0], slot.streamSocket.write.wsabuf.length, null, 0, ovl, handler); } ();
447 		if (ret == SOCKET_ERROR) {
448 			auto err = WSAGetLastError();
449 			if (err == WSA_IO_PENDING) {
450 				if (slot.streamSocket.write.mode == IOMode.immediate) {
451 					invokeCallback(IOStatus.wouldBlock, 0);
452 				}
453 			} else {
454 				auto st = handleWriteError(err, slot.streamSocket);
455 				invokeCallback(st, slot.streamSocket.write.bytesTransferred);
456 			}
457 		}
458 	}
459 
460 	private static IOStatus handleWriteError(DWORD err, ref StreamSocketSlot slot)
461 	@safe nothrow {
462 		switch (err) {
463 			case 0: return IOStatus.ok;
464 			case WSAEDISCON, WSAESHUTDOWN:
465 				if (slot.state == ConnectionState.passiveClose)
466 					slot.state = ConnectionState.closed;
467 				else if (slot.state != ConnectionState.closed)
468 					slot.state = ConnectionState.activeClose;
469 				return IOStatus.disconnected;
470 			case WSAECONNABORTED, WSAECONNRESET, WSAENETRESET, WSAETIMEDOUT:
471 				slot.state = ConnectionState.closed;
472 				return IOStatus.disconnected;
473 			default: return IOStatus.error;
474 		}
475 	}
476 
477 
478 	override void waitForData(StreamSocketFD socket, IOCallback on_data_available)
479 	{
480 		if (!isValid(socket)) {
481 			on_data_available(socket, IOStatus.invalidHandle, 0);
482 			return;
483 		}
484 
485 		assert(false, "TODO!");
486 	}
487 
488 	override void shutdown(StreamSocketFD socket, bool shut_read = true, bool shut_write = true)
489 	{
490 		if (!isValid(socket)) return;
491 
492 		() @trusted { WSASendDisconnect(socket, null); } ();
493 		with (m_sockets[socket].streamSocket) {
494 			if (state == ConnectionState.passiveClose)
495 				state = ConnectionState.closed;
496 			else state = ConnectionState.activeClose;
497 		}
498 	}
499 
500 	override void cancelRead(StreamSocketFD socket)
501 	@trusted @nogc {
502 		if (!isValid(socket)) return;
503 		if (!m_sockets[socket].streamSocket.read.callback) return;
504 		CancelIoEx(cast(HANDLE)cast(SOCKET)socket, cast(LPOVERLAPPED)&m_sockets[socket].streamSocket.read.overlapped);
505 		m_sockets[socket].streamSocket.read.callback = null;
506 		m_core.removeWaiter();
507 	}
508 
509 	override void cancelWrite(StreamSocketFD socket)
510 	@trusted @nogc {
511 		if (!isValid(socket)) return;
512 		if (!m_sockets[socket].streamSocket.write.callback) return;
513 		CancelIoEx(cast(HANDLE)cast(SOCKET)socket, cast(LPOVERLAPPED)&m_sockets[socket].streamSocket.write.overlapped);
514 		m_sockets[socket].streamSocket.write.callback = null;
515 		m_core.removeWaiter();
516 	}
517 
518 	final override DatagramSocketFD createDatagramSocket(scope Address bind_address,
519 		scope Address target_address, DatagramCreateOptions options = DatagramCreateOptions.init)
520 	{
521 		auto fd = () @trusted { return WSASocketW(bind_address.addressFamily, SOCK_DGRAM, IPPROTO_UDP, null, 0, WSA_FLAG_OVERLAPPED); } ();
522 		if (fd == INVALID_SOCKET)
523 			return DatagramSocketFD.invalid;
524 
525 		void invalidateSocket() @nogc @trusted nothrow { closesocket(fd); fd = INVALID_SOCKET; }
526 
527 		() @trusted {
528 			int tmp_reuse = 1;
529 			if (setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &tmp_reuse, tmp_reuse.sizeof) != 0) {
530 				invalidateSocket();
531 				return;
532 			}
533 
534 			if (bind(fd, bind_address.name, bind_address.nameLen) != 0) {
535 				invalidateSocket();
536 				return;
537 			}
538 		} ();
539 
540 		if (fd == INVALID_SOCKET)
541 			return DatagramSocketFD.invalid;
542 
543 		auto sock = adoptDatagramSocketInternal(fd);
544 
545 		if (target_address !is null)
546 			setTargetAddress(sock, target_address);
547 
548 		return sock;
549 	}
550 
551 	final override DatagramSocketFD adoptDatagramSocket(int socket)
552 	{
553 		return adoptDatagramSocketInternal(socket);
554 	}
555 
556 	private DatagramSocketFD adoptDatagramSocketInternal(SOCKET socket)
557 	{
558 		if (m_sockets[socket].common.refCount) // FD already in use?
559 			return DatagramSocketFD.invalid;
560 
561 		void setupOverlapped(ref OVERLAPPED_CORE overlapped) @trusted @nogc nothrow {
562 			overlapped.Internal = 0;
563 			overlapped.InternalHigh = 0;
564 			overlapped.Offset = 0;
565 			overlapped.OffsetHigh = 0;
566 			overlapped.hEvent = cast(HANDLE)cast(void*)&m_sockets[socket];
567 			overlapped.driver = m_core;
568 		}
569 
570 		auto fd = initSocketSlot!DatagramSocketFD(socket);
571 		with (m_sockets[socket]) {
572 			specific = DatagramSocketSlot.init;
573 			setupOverlapped(datagramSocket.write.overlapped);
574 			setupOverlapped(datagramSocket.read.overlapped);
575 		}
576 
577 		//() @trusted { WSAAsyncSelect(socket, m_hwnd, WM_USER_SOCKET, FD_READ|FD_WRITE|FD_CONNECT|FD_CLOSE); } ();
578 
579 		return fd;
580 	}
581 
582 	final override void setTargetAddress(DatagramSocketFD socket, scope Address target_address)
583 	{
584 		if (!isValid(socket)) return;
585 
586 		() @trusted { connect(cast(SOCKET)socket, target_address.name, target_address.nameLen); } ();
587 	}
588 
589 	final override bool setBroadcast(DatagramSocketFD socket, bool enable)
590 	{
591 		if (!isValid(socket)) return false;
592 
593 		int tmp_broad = enable;
594 		return () @trusted { return setsockopt(cast(SOCKET)socket, SOL_SOCKET, SO_BROADCAST, &tmp_broad, tmp_broad.sizeof); } () == 0;
595 	}
596 
597 	final override bool joinMulticastGroup(DatagramSocketFD socket, scope Address multicast_address, uint interface_index = 0)
598 	{
599 		import std.socket : AddressFamily;
600 
601 		if (!isValid(socket)) return false;
602 
603 		switch (multicast_address.addressFamily) {
604 			default: assert(false, "Multicast only supported for IPv4/IPv6 sockets.");
605 			case AddressFamily.INET:
606 				struct ip_mreq {
607 					in_addr imr_multiaddr;   /* IP multicast address of group */
608 					in_addr imr_interface;   /* local IP address of interface */
609 				}
610 				auto addr = () @trusted { return cast(sockaddr_in*)multicast_address.name; } ();
611 				ip_mreq mreq;
612 				mreq.imr_multiaddr = addr.sin_addr;
613 				mreq.imr_interface.s_addr = htonl(interface_index);
614 				return () @trusted { return setsockopt(cast(SOCKET)socket, IPPROTO_IP, IP_ADD_MEMBERSHIP, &mreq, ip_mreq.sizeof); } () == 0;
615 			case AddressFamily.INET6:
616 				struct ipv6_mreq {
617 					in6_addr ipv6mr_multiaddr;
618 					uint ipv6mr_interface;
619 				}
620 				auto addr = () @trusted { return cast(sockaddr_in6*)multicast_address.name; } ();
621 				ipv6_mreq mreq;
622 				mreq.ipv6mr_multiaddr = addr.sin6_addr;
623 				mreq.ipv6mr_interface = htonl(interface_index);
624 				return () @trusted { return setsockopt(cast(SOCKET)socket, IPPROTO_IP, IPV6_JOIN_GROUP, &mreq, ipv6_mreq.sizeof); } () == 0;
625 		}
626 	}
627 
628 	override void receive(DatagramSocketFD socket, ubyte[] buffer, IOMode mode, DatagramIOCallback on_read_finish)
629 	{
630 		if (!isValid(socket)) {
631 			RefAddress addr;
632 			on_read_finish(socket, IOStatus.invalidHandle, 0, addr);
633 			return;
634 		}
635 
636 		auto slot = () @trusted { return &m_sockets[socket].datagramSocket(); } ();
637 		slot.read.buffer = buffer;
638 		slot.read.wsabuf[0].buf = () @trusted { return buffer.ptr; } ();
639 		slot.read.wsabuf[0].len = buffer.length;
640 		slot.read.mode = mode;
641 		slot.sourceAddrLen = DatagramSocketSlot.sourceAddr.sizeof;
642 
643 		auto ovl = &slot.read.overlapped.overlapped;
644 		DWORD flags = 0;
645 		auto handler = &overlappedIOHandler!(onIOReceiveCompleted, DWORD);
646 		auto ret = () @trusted { return WSARecvFrom(socket, &slot.read.wsabuf[0], slot.read.wsabuf.length, null, &flags, cast(SOCKADDR*)&slot.sourceAddr, &slot.sourceAddrLen, ovl, handler); } ();
647 		if (ret == SOCKET_ERROR) {
648 			auto err = WSAGetLastError();
649 			if (err != WSA_IO_PENDING) {
650 				on_read_finish(socket, IOStatus.error, 0, null);
651 				return;
652 			}
653 		}
654 
655 		if (mode == IOMode.immediate) {
656 			() @trusted { CancelIoEx(cast(HANDLE)cast(SOCKET)socket, cast(LPOVERLAPPED)&slot.read.overlapped); } ();
657 			on_read_finish(socket, IOStatus.wouldBlock, 0, null);
658 			return;
659 		}
660 
661 		slot.read.callback = on_read_finish;
662 		addRef(socket);
663 		m_core.addWaiter();
664 	}
665 
666 	override void cancelReceive(DatagramSocketFD socket)
667 	@trusted @nogc {
668 		if (!m_sockets[socket].datagramSocket.read.callback) return;
669 		CancelIoEx(cast(HANDLE)cast(SOCKET)socket, cast(LPOVERLAPPED)&m_sockets[socket].datagramSocket.read.overlapped);
670 		m_sockets[socket].datagramSocket.read.callback = null;
671 		m_core.removeWaiter();
672 	}
673 
674 	private static nothrow
675 	void onIOReceiveCompleted(DWORD dwError, DWORD cbTransferred, OVERLAPPED_CORE* lpOverlapped)
676 	{
677 		auto slot = () @trusted { return cast(SocketVector.FullField*)lpOverlapped.hEvent; } ();
678 
679 		if (!slot.datagramSocket.read.callback) return;
680 
681 		void invokeCallback(IOStatus status, size_t nsent)
682 		@safe nothrow {
683 			slot.common.core.removeWaiter();
684 			auto cb = slot.datagramSocket.read.callback;
685 			slot.datagramSocket.read.callback = null;
686 			slot.datagramSocket.read.buffer = null;
687 			slot.datagramSocket.read.wsabuf = WSABUF.init;
688 			scope addr = new RefAddress(cast(sockaddr*)&slot.datagramSocket.sourceAddr, slot.datagramSocket.sourceAddrLen);
689 			if (slot.common.driver.releaseRef(cast(DatagramSocketFD)slot.common.fd))
690 				cb(cast(DatagramSocketFD)slot.common.fd, status, nsent, status == IOStatus.ok ? addr : null);
691 		}
692 
693 		slot.datagramSocket.read.bytesTransferred += cbTransferred;
694 		slot.datagramSocket.read.buffer = slot.datagramSocket.read.buffer[cbTransferred .. $];
695 
696 		if (!dwError && (slot.datagramSocket.read.mode != IOMode.all || !slot.datagramSocket.read.buffer.length)) {
697 			invokeCallback(IOStatus.ok, cbTransferred);
698 			return;
699 		}
700 
701 		if (dwError == WSA_OPERATION_ABORTED && slot.datagramSocket.write.mode == IOMode.immediate) {
702 			invokeCallback(IOStatus.wouldBlock, 0);
703 			return;
704 		}
705 
706 		if (dwError) {
707 			invokeCallback(IOStatus.error, 0);
708 			return;
709 		}
710 
711 		slot.datagramSocket.read.wsabuf[0].len = slot.datagramSocket.read.buffer.length;
712 		slot.datagramSocket.read.wsabuf[0].buf = () @trusted { return cast(ubyte*)slot.datagramSocket.read.buffer.ptr; } ();
713 		auto ovl = slot.datagramSocket.read.mode == IOMode.immediate ? null : &slot.datagramSocket.read.overlapped.overlapped;
714 		DWORD flags = 0;
715 		auto handler = &overlappedIOHandler!(onIOReceiveCompleted, DWORD);
716 		auto ret = () @trusted { return WSARecvFrom(slot.common.fd, &slot.datagramSocket.read.wsabuf[0], slot.datagramSocket.read.wsabuf.length, null, &flags, cast(SOCKADDR*)&slot.datagramSocket.sourceAddr, &slot.datagramSocket.sourceAddrLen, ovl, handler); } ();
717 		if (ret == SOCKET_ERROR) {
718 			auto err = WSAGetLastError();
719 			if (err == WSA_IO_PENDING) {
720 				if (slot.datagramSocket.read.mode == IOMode.immediate) {
721 					invokeCallback(IOStatus.wouldBlock, 0);
722 				}
723 			} else {
724 				invokeCallback(IOStatus.error, 0);
725 			}
726 		}
727 	}
728 
729 	override void send(DatagramSocketFD socket, const(ubyte)[] buffer, IOMode mode, Address target_address, DatagramIOCallback on_write_finish)
730 	{
731 		if (!isValid(socket)) {
732 			RefAddress addr;
733 			on_write_finish(socket, IOStatus.invalidHandle, 0, addr);
734 			return;
735 		}
736 
737 		auto slot = () @trusted { return &m_sockets[socket].datagramSocket(); } ();
738 		slot.write.buffer = buffer;
739 		slot.write.wsabuf[0].len = buffer.length;
740 		slot.write.wsabuf[0].buf = () @trusted { return cast(ubyte*)buffer.ptr; } ();
741 		slot.write.mode = mode;
742 		slot.targetAddr = target_address;
743 
744 		auto ovl = &slot.write.overlapped.overlapped;
745 		auto tan = target_address ? target_address.name : null;
746 		auto tal = target_address ? target_address.nameLen : 0;
747 		auto handler = &overlappedIOHandler!(onIOSendCompleted, DWORD);
748 		auto ret = () @trusted { return WSASendTo(socket, &slot.write.wsabuf[0], slot.write.wsabuf.length, null, 0, tan, tal, ovl, handler); } ();
749 
750 		if (ret != 0) {
751 			auto err = WSAGetLastError();
752 			if (err != WSA_IO_PENDING) {
753 				on_write_finish(socket, IOStatus.error, 0, null);
754 				return;
755 			}
756 		}
757 
758 		if (mode == IOMode.immediate) {
759 			() @trusted { CancelIoEx(cast(HANDLE)cast(SOCKET)socket, cast(LPOVERLAPPED)&slot.write.overlapped); } ();
760 			on_write_finish(socket, IOStatus.wouldBlock, 0, null);
761 			return;
762 		}
763 
764 		slot.write.callback = on_write_finish;
765 		addRef(socket);
766 		m_core.addWaiter();
767 	}
768 
769 	override void cancelSend(DatagramSocketFD socket)
770 	@trusted @nogc {
771 		if (!m_sockets[socket].datagramSocket.write.callback) return;
772 		CancelIoEx(cast(HANDLE)cast(SOCKET)socket, cast(LPOVERLAPPED)&m_sockets[socket].datagramSocket.write.overlapped);
773 		m_sockets[socket].datagramSocket.write.callback = null;
774 		m_core.removeWaiter();
775 	}
776 
777 	private static nothrow
778 	void onIOSendCompleted(DWORD dwError, DWORD cbTransferred, OVERLAPPED_CORE* lpOverlapped)
779 	{
780 		auto slot = () @trusted { return cast(SocketVector.FullField*)lpOverlapped.hEvent; } ();
781 
782 		if (!slot.datagramSocket.write.callback) return;
783 
784 		void invokeCallback(IOStatus status, size_t nsent)
785 		@safe nothrow {
786 			slot.common.core.removeWaiter();
787 			auto cb = slot.datagramSocket.write.callback;
788 			auto addr = slot.datagramSocket.targetAddr;
789 			slot.datagramSocket.write.callback = null;
790 			slot.datagramSocket.targetAddr = null;
791 
792 			if (slot.common.driver.releaseRef(cast(DatagramSocketFD)slot.common.fd)) {
793 				if (addr) {
794 					scope raddr = new RefAddress(addr.name, addr.nameLen);
795 					cb(cast(DatagramSocketFD)slot.common.fd, status, nsent, raddr);
796 				} else {
797 					cb(cast(DatagramSocketFD)slot.common.fd, status, nsent, null);
798 				}
799 			}
800 		}
801 
802 		slot.datagramSocket.write.bytesTransferred += cbTransferred;
803 		slot.datagramSocket.write.buffer = slot.datagramSocket.write.buffer[cbTransferred .. $];
804 
805 		if (!dwError && (slot.datagramSocket.write.mode != IOMode.all || !slot.datagramSocket.write.buffer.length)) {
806 			invokeCallback(IOStatus.ok, cbTransferred);
807 			return;
808 		}
809 
810 		if (dwError == WSA_OPERATION_ABORTED && slot.datagramSocket.write.mode == IOMode.immediate) {
811 			invokeCallback(IOStatus.wouldBlock, 0);
812 			return;
813 		}
814 
815 		if (dwError) {
816 			invokeCallback(IOStatus.error, 0);
817 			return;
818 		}
819 
820 		slot.datagramSocket.write.wsabuf[0].len = slot.datagramSocket.write.buffer.length;
821 		slot.datagramSocket.write.wsabuf[0].buf = () @trusted { return cast(ubyte*)slot.datagramSocket.write.buffer.ptr; } ();
822 		auto tan = slot.datagramSocket.targetAddr ? slot.datagramSocket.targetAddr.name : null;
823 		auto tal = slot.datagramSocket.targetAddr ? slot.datagramSocket.targetAddr.nameLen : 0;
824 		auto ovl = slot.datagramSocket.write.mode == IOMode.immediate ? null : &slot.datagramSocket.write.overlapped.overlapped;
825 		auto handler = &overlappedIOHandler!(onIOSendCompleted, DWORD);
826 		auto ret = () @trusted { return WSASendTo(slot.common.fd, &slot.datagramSocket.write.wsabuf[0], slot.datagramSocket.write.wsabuf.length, null, 0, tan, tal, ovl, handler); } ();
827 		if (ret == SOCKET_ERROR) {
828 			auto err = WSAGetLastError();
829 			if (err == WSA_IO_PENDING) {
830 				if (slot.datagramSocket.write.mode == IOMode.immediate) {
831 					invokeCallback(IOStatus.wouldBlock, 0);
832 				}
833 			} else {
834 				invokeCallback(IOStatus.error, 0);
835 			}
836 		}
837 	}
838 
839 	override bool isValid(SocketFD handle)
840 	const {
841 		if (handle.value >= m_sockets.length) return false;
842 		return handle.validationCounter == m_sockets[handle].common.validationCounter;
843 	}
844 
845 	override void addRef(SocketFD fd)
846 	{
847 		if (!isValid(fd)) return;
848 
849 		assert(m_sockets[fd].common.refCount > 0, "Adding reference to unreferenced socket FD.");
850 		m_sockets[fd].common.refCount++;
851 	}
852 
853 	override bool releaseRef(SocketFD fd)
854 	@nogc {
855 		import taggedalgebraic : hasType;
856 
857 		if (!isValid(fd)) return true;
858 
859 		auto slot = () @trusted { return &m_sockets[fd]; } ();
860 		nogc_assert(slot.common.refCount > 0, "Releasing reference to unreferenced socket FD.");
861 		if (--slot.common.refCount == 0) {
862 			final switch (slot.specific.kind) with (SocketVector.FieldType) {
863 				case Kind.none: break;
864 				case Kind.streamSocket:
865 					cancelRead(cast(StreamSocketFD)fd);
866 					cancelWrite(cast(StreamSocketFD)fd);
867 					m_core.discardEvents(&slot.streamSocket.read.overlapped, &slot.streamSocket.write.overlapped);
868 					break;
869 				case Kind.streamListen:
870 					if (m_sockets[fd].streamListen.acceptCallback)
871 						m_core.removeWaiter();
872 					break;
873 				case Kind.datagramSocket:
874 					cancelReceive(cast(DatagramSocketFD)fd);
875 					cancelSend(cast(DatagramSocketFD)fd);
876 					m_core.discardEvents(&slot.datagramSocket.read.overlapped, &slot.datagramSocket.write.overlapped);
877 					break;
878 			}
879 
880 			clearSocketSlot(fd);
881 			() @trusted { closesocket(fd); } ();
882 			return false;
883 		}
884 		return true;
885 	}
886 
887 	final override bool setOption(DatagramSocketFD socket, DatagramSocketOption option, bool enable)
888 	{
889 		if (!isValid(socket)) return false;
890 
891 		int proto, opt;
892 		final switch (option) {
893 			case DatagramSocketOption.broadcast: proto = SOL_SOCKET; opt = SO_BROADCAST; break;
894 			case DatagramSocketOption.multicastLoopback: proto = IPPROTO_IP; opt = IP_MULTICAST_LOOP; break;
895 		}
896 		int tmp = enable;
897 		return () @trusted { return setsockopt(cast(SOCKET)socket, proto, opt, &tmp, tmp.sizeof); } () == 0;
898 	}
899 
900 	final override bool setOption(StreamSocketFD socket, StreamSocketOption option, bool enable)
901 	{
902 		if (!isValid(socket)) return false;
903 
904 		int proto, opt;
905 		final switch (option) {
906 			case StreamSocketOption.noDelay: proto = IPPROTO_TCP; opt = TCP_NODELAY; break;
907 			case StreamSocketOption.keepAlive: proto = SOL_SOCKET; opt = SO_KEEPALIVE; break;
908 		}
909 		int tmp = enable;
910 		return () @trusted { return setsockopt(cast(SOCKET)socket, proto, opt, &tmp, tmp.sizeof); } () == 0;
911 	}
912 
913 	final protected override void* rawUserData(StreamSocketFD descriptor, size_t size, DataInitializer initialize, DataInitializer destroy)
914 	@system {
915 		return rawUserDataImpl(descriptor, size, initialize, destroy);
916 	}
917 
918 	final protected override void* rawUserData(StreamListenSocketFD descriptor, size_t size, DataInitializer initialize, DataInitializer destroy)
919 	@system {
920 		return rawUserDataImpl(descriptor, size, initialize, destroy);
921 	}
922 
923 	final protected override void* rawUserData(DatagramSocketFD descriptor, size_t size, DataInitializer initialize, DataInitializer destroy)
924 	@system {
925 		return rawUserDataImpl(descriptor, size, initialize, destroy);
926 	}
927 
928 	private void* rawUserDataImpl(SocketFD descriptor, size_t size, DataInitializer initialize, DataInitializer destroy)
929 	@system @nogc {
930 		if (!isValid(descriptor)) return null;
931 
932 		SocketSlot* fds = &m_sockets[descriptor].common;
933 		assert(fds.userDataDestructor is null || fds.userDataDestructor is destroy,
934 			"Requesting user data with differing type (destructor).");
935 		assert(size <= SocketSlot.userData.length, "Requested user data is too large.");
936 		if (size > SocketSlot.userData.length) assert(false);
937 		if (!fds.userDataDestructor) {
938 			initialize(fds.userData.ptr);
939 			fds.userDataDestructor = destroy;
940 		}
941 		return fds.userData.ptr;
942 	}
943 
944 	private FDType initSocketSlot(FDType)(SOCKET socket)
945 	{
946 		m_socketCount++;
947 		m_sockets[socket].common.refCount = 1;
948 		auto vc = ++m_sockets[socket].common.validationCounter;
949 		auto fd = FDType(socket, vc);
950 		m_sockets[socket].common.fd = fd;
951 		m_sockets[socket].common.driver = this;
952 
953 		return fd;
954 	}
955 
956 	package void clearSocketSlot(FD fd)
957 	@nogc {
958 		m_socketCount--;
959 		auto slot = () @trusted { return &m_sockets[fd]; } ();
960 		if (slot.common.userDataDestructor)
961 			() @trusted { slot.common.userDataDestructor(slot.common.userData.ptr); } ();
962 		*slot = m_sockets.FullField.init;
963 	}
964 
965 	private static nothrow extern(System)
966 	LRESULT onMessage(HWND wnd, UINT msg, WPARAM wparam, LPARAM lparam)
967 	{
968 		auto driver = () @trusted { return cast(WinAPIEventDriverSockets)cast(void*)GetWindowLongPtrA(wnd, GWLP_USERDATA); } ();
969 		switch (msg) {
970 			default: break;
971 			case WM_USER_SOCKET:
972 				SOCKET sock = cast(SOCKET)wparam;
973 				auto evt = () @trusted { return LOWORD(lparam); } ();
974 				auto err = () @trusted { return HIWORD(lparam); } ();
975 				auto slot = () @trusted { return &driver.m_sockets[sock]; } ();
976 				final switch (slot.specific.kind) with (SocketVector.FieldType) {
977 					case Kind.none: break;
978 					case Kind.streamSocket:
979 						switch (evt) {
980 							default: break;
981 							case FD_CONNECT:
982 								auto cb = slot.streamSocket.connectCallback;
983 								if (!cb) break; // cancelled connect?
984 
985 								auto fd = StreamSocketFD(sock, slot.common.validationCounter);
986 
987 								slot.streamSocket.connectCallback = null;
988 								slot.common.driver.m_core.removeWaiter();
989 								if (err) {
990 									slot.streamSocket.state = ConnectionState.closed;
991 									cb(fd, ConnectStatus.refused);
992 								} else {
993 									slot.streamSocket.state = ConnectionState.connected;
994 									cb(fd, ConnectStatus.connected);
995 								}
996 								break;
997 							case FD_READ:
998 								break;
999 							case FD_WRITE:
1000 								break;
1001 						}
1002 						break;
1003 					case Kind.streamListen:
1004 						if (evt == FD_ACCEPT) {
1005 							/*
1006 			sock_t sockfd;
1007 			sockaddr_storage addr;
1008 			socklen_t addr_len = addr.sizeof;
1009 			() @trusted { sockfd = accept(cast(sock_t)listenfd, () @trusted { return cast(sockaddr*)&addr; } (), &addr_len); } ();
1010 			if (sockfd == -1) break;
1011 
1012 			setSocketNonBlocking(cast(SocketFD)sockfd);
1013 			auto fd = cast(StreamSocketFD)sockfd;
1014 			initSocketSlot(fd);
1015 			m_sockets[fd].specific = StreamSocketSlot.init;
1016 			m_sockets[fd].streamSocket.state = ConnectionState.connected;
1017 			m_loop.registerFD(fd, EventMask.read|EventMask.write|EventMask.status);
1018 			m_loop.setNotifyCallback!(EventType.status)(fd, &onConnectError);
1019 			releaseRef(fd); // setNotifyCallback adds a reference, but waiting for status/disconnect should not affect the ref count
1020 			//print("accept %d", sockfd);
1021 			scope RefAddress addrc = new RefAddress(() @trusted { return cast(sockaddr*)&addr; } (), addr_len);
1022 			m_sockets[listenfd].streamListen.acceptCallback(cast(StreamListenSocketFD)listenfd, fd, addrc);
1023 							*/
1024 							SOCKADDR_STORAGE addr;
1025 							socklen_t addr_len = addr.sizeof;
1026 							auto clientsockfd = () @trusted { return WSAAccept(sock, cast(sockaddr*)&addr, &addr_len, null, 0); } ();
1027 							if (clientsockfd == INVALID_SOCKET) return 0;
1028 							auto clientsock = driver.adoptStreamInternal(clientsockfd, ConnectionState.connected);
1029 							scope RefAddress addrc = new RefAddress(() @trusted { return cast(sockaddr*)&addr; } (), addr_len);
1030 							auto fd = StreamListenSocketFD(sock, slot.common.validationCounter);
1031 							slot.streamListen.acceptCallback(fd, clientsock, addrc);
1032 						}
1033 						break;
1034 					case Kind.datagramSocket:
1035 						break;
1036 				}
1037 				return 0;
1038 		}
1039 		return () @trusted { return DefWindowProcA(wnd, msg, wparam, lparam); } ();
1040 	}
1041 }
1042 
1043 void setupWindowClass()
1044 @trusted nothrow @nogc {
1045 	static __gshared registered = false;
1046 
1047 	if (registered) return;
1048 
1049 	WNDCLASSA wc;
1050 	wc.lpfnWndProc = &WinAPIEventDriverSockets.onMessage;
1051 	wc.lpszClassName = "VibeWin32MessageWindow";
1052 	RegisterClassA(&wc);
1053 	registered = true;
1054 }
1055 
1056 static struct SocketSlot {
1057 	SocketFD fd; // redundant, but needed by the current IO Completion Routines based approach
1058 	WinAPIEventDriverSockets driver; // redundant, but needed by the current IO Completion Routines based approach
1059 	@property inout(WinAPIEventDriverCore) core() @safe nothrow inout { return driver.m_core; }
1060 	int refCount;
1061 	uint validationCounter;
1062 	DataInitializer userDataDestructor;
1063 	ubyte[16*size_t.sizeof] userData;
1064 }
1065 
1066 private struct StreamSocketSlot {
1067 	alias Handle = StreamSocketFD;
1068 	StreamDirection!true write;
1069 	StreamDirection!false read;
1070 	ConnectCallback connectCallback;
1071 	ConnectionState state;
1072 }
1073 
1074 static struct StreamDirection(bool RO) {
1075 	OVERLAPPED_CORE overlapped;
1076 	static if (RO) const(ubyte)[] buffer;
1077 	else ubyte[] buffer;
1078 	WSABUF[1] wsabuf;
1079 	size_t bytesTransferred;
1080 	IOMode mode;
1081 	IOCallback callback;
1082 
1083 	void reset() { buffer = null; wsabuf[0] = WSABUF.init; callback = null; }
1084 }
1085 
1086 private struct StreamListenSocketSlot {
1087 	alias Handle = StreamListenSocketFD;
1088 	AcceptCallback acceptCallback;
1089 }
1090 
1091 private struct DatagramSocketSlot {
1092 	alias Handle = DatagramSocketFD;
1093 	DgramDirection!true write;
1094 	DgramDirection!false read;
1095 	Address targetAddr;
1096 	SOCKADDR_STORAGE sourceAddr;
1097 	INT sourceAddrLen;
1098 }
1099 
1100 static struct DgramDirection(bool RO) {
1101 	OVERLAPPED_CORE overlapped;
1102 	static if (RO) const(ubyte)[] buffer;
1103 	else ubyte[] buffer;
1104 	WSABUF[1] wsabuf;
1105 	size_t bytesTransferred;
1106 	IOMode mode;
1107 	DatagramIOCallback callback;
1108 
1109 	void reset() { buffer = null; wsabuf[0] = WSABUF.init; callback = null; }
1110 }