1 module eventcore.drivers.winapi.sockets; 2 3 version (Windows): 4 5 import eventcore.driver; 6 import eventcore.drivers.winapi.core; 7 import eventcore.internal.win32; 8 import eventcore.internal.utils : AlgebraicChoppedVector, print, nogc_assert; 9 import std.socket : Address; 10 11 import core.time: Duration; 12 13 private enum WM_USER_SOCKET = WM_USER + 1; 14 15 16 final class WinAPIEventDriverSockets : EventDriverSockets { 17 @safe: /*@nogc:*/ nothrow: 18 private { 19 alias SocketVector = AlgebraicChoppedVector!(SocketSlot, StreamSocketSlot, StreamListenSocketSlot, DatagramSocketSlot); 20 SocketVector m_sockets; 21 size_t m_socketCount = 0; 22 WinAPIEventDriverCore m_core; 23 DWORD m_tid; 24 HWND m_hwnd; 25 size_t m_waiters; 26 } 27 28 this(WinAPIEventDriverCore core) 29 @trusted @nogc { 30 m_tid = GetCurrentThreadId(); 31 m_core = core; 32 33 // setup socket event message window 34 setupWindowClass(); 35 m_hwnd = CreateWindowA("VibeWin32MessageWindow", "VibeWin32MessageWindow", 0, 0,0,0,0, HWND_MESSAGE,null,null,null); 36 SetWindowLongPtrA(m_hwnd, GWLP_USERDATA, cast(ULONG_PTR)cast(void*)this); 37 assert(cast(WinAPIEventDriverSockets)cast(void*)GetWindowLongPtrA(m_hwnd, GWLP_USERDATA) is this); 38 } 39 40 package @property size_t waiterCount() const { return m_waiters; } 41 42 package bool checkForLeakedHandles() 43 { 44 if (m_socketCount == 0) return false; 45 print("Warning: %s socket handles leaked at driver shutdown.", m_socketCount); 46 return true; 47 } 48 49 override StreamSocketFD connectStream(scope Address peer_address, scope Address bind_address, ConnectCallback on_connect) 50 @trusted { 51 assert(m_tid == GetCurrentThreadId()); 52 53 auto fd = WSASocketW(peer_address.addressFamily, SOCK_STREAM, IPPROTO_TCP, null, 0, WSA_FLAG_OVERLAPPED); 54 if (fd == INVALID_SOCKET) { 55 on_connect(StreamSocketFD.invalid, ConnectStatus.socketCreateFailure); 56 return StreamSocketFD.invalid; 57 } 58 59 void invalidateSocket() @nogc @trusted nothrow { closesocket(fd); fd = INVALID_SOCKET; } 60 61 int bret; 62 if (bind_address !is null) 63 bret = bind(fd, bind_address.name, bind_address.nameLen); 64 if (bret != 0) { 65 invalidateSocket(); 66 on_connect(StreamSocketFD.invalid, ConnectStatus.bindFailure); 67 return StreamSocketFD.invalid; 68 } 69 70 auto sock = adoptStreamInternal(fd, ConnectionState.connecting); 71 72 auto ret = .connect(fd, peer_address.name, peer_address.nameLen); 73 //auto ret = WSAConnect(m_socket, peer_address.name, peer_address.nameLen, null, null, null, null); 74 75 if (ret == 0) { 76 m_sockets[sock].specific.state = ConnectionState.connected; 77 on_connect(sock, ConnectStatus.connected); 78 return sock; 79 } 80 81 auto err = WSAGetLastError(); 82 if (err == WSAEWOULDBLOCK) { 83 with (m_sockets[sock].streamSocket) { 84 connectCallback = on_connect; 85 } 86 87 m_core.addWaiter(); 88 return sock; 89 } else { 90 clearSocketSlot(sock); 91 invalidateSocket(); 92 on_connect(StreamSocketFD.invalid, ConnectStatus.unknownError); 93 return StreamSocketFD.invalid; 94 } 95 } 96 97 final override void cancelConnectStream(StreamSocketFD sock) 98 { 99 if (!isValid(sock)) return; 100 101 with (m_sockets[sock].streamSocket) { 102 assert(state == ConnectionState.connecting, 103 "Must be in 'connecting' state when calling cancelConnection."); 104 105 state = ConnectionState.closed; 106 connectCallback = null; 107 m_core.removeWaiter(); 108 } 109 } 110 111 override StreamSocketFD adoptStream(int socket) 112 { 113 return adoptStreamInternal(socket, ConnectionState.connected); 114 } 115 116 private StreamSocketFD adoptStreamInternal(SOCKET socket, ConnectionState state) 117 { 118 if (m_sockets[socket].common.refCount) // FD already in use? 119 return StreamSocketFD.invalid; 120 121 // done by wsaasyncselect 122 //uint enable = 1; 123 //() @trusted { ioctlsocket(socket, FIONBIO, &enable); } (); 124 125 void setupOverlapped(ref OVERLAPPED_CORE overlapped) @trusted @nogc nothrow { 126 overlapped.Internal = 0; 127 overlapped.InternalHigh = 0; 128 overlapped.Offset = 0; 129 overlapped.OffsetHigh = 0; 130 overlapped.hEvent = cast(HANDLE)cast(void*)&m_sockets[socket]; 131 overlapped.driver = m_core; 132 } 133 134 auto fd = initSocketSlot!StreamSocketFD(socket); 135 with (m_sockets[socket]) { 136 specific = StreamSocketSlot.init; 137 streamSocket.state = state; 138 setupOverlapped(streamSocket.write.overlapped); 139 setupOverlapped(streamSocket.read.overlapped); 140 } 141 142 () @trusted { WSAAsyncSelect(socket, m_hwnd, WM_USER_SOCKET, FD_CONNECT|FD_CLOSE); } (); 143 144 return fd; 145 } 146 147 alias listenStream = EventDriverSockets.listenStream; 148 override StreamListenSocketFD listenStream(scope Address bind_address, StreamListenOptions options, AcceptCallback on_accept) 149 { 150 auto fd = () @trusted { return WSASocketW(bind_address.addressFamily, SOCK_STREAM, IPPROTO_TCP, null, 0, WSA_FLAG_OVERLAPPED); } (); 151 if (fd == INVALID_SOCKET) 152 return StreamListenSocketFD.invalid; 153 154 void invalidateSocket() @nogc @trusted nothrow { closesocket(fd); fd = INVALID_SOCKET; } 155 156 () @trusted { 157 if (options & StreamListenOptions.reuseAddress) { 158 int tmp_reuse = 1; 159 if (setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &tmp_reuse, tmp_reuse.sizeof) != 0) { 160 invalidateSocket(); 161 return; 162 } 163 } 164 165 // FIXME: should SO_EXCLUSIVEADDRUSE be used of StreamListenOptions.reuseAddress isn't set? 166 167 if (bind(fd, bind_address.name, bind_address.nameLen) != 0) { 168 invalidateSocket(); 169 return; 170 } 171 if (listen(fd, 128) != 0) { 172 invalidateSocket(); 173 return; 174 } 175 } (); 176 177 if (fd == INVALID_SOCKET) 178 return StreamListenSocketFD.invalid; 179 180 auto sock = initSocketSlot!StreamListenSocketFD(fd); 181 m_sockets[sock].specific = StreamListenSocketSlot.init; 182 183 if (on_accept) waitForConnections(sock, on_accept); 184 185 return sock; 186 } 187 188 override void waitForConnections(StreamListenSocketFD sock, AcceptCallback on_accept) 189 { 190 if (!isValid(sock)) return; 191 192 assert(!m_sockets[sock].streamListen.acceptCallback); 193 m_sockets[sock].streamListen.acceptCallback = on_accept; 194 () @trusted { WSAAsyncSelect(sock, m_hwnd, WM_USER_SOCKET, FD_ACCEPT); } (); 195 m_core.addWaiter(); 196 } 197 198 override ConnectionState getConnectionState(StreamSocketFD sock) 199 { 200 if (!isValid(sock)) return ConnectionState.closed; 201 202 return m_sockets[sock].streamSocket.state; 203 } 204 205 override bool getLocalAddress(SocketFD sock, scope RefAddress dst) 206 { 207 if (!isValid(sock)) return false; 208 209 socklen_t addr_len = dst.nameLen; 210 if (() @trusted { return getsockname(sock, dst.name, &addr_len); } () != 0) 211 return false; 212 dst.cap(addr_len); 213 return true; 214 } 215 216 override bool getRemoteAddress(SocketFD sock, scope RefAddress dst) 217 { 218 if (!isValid(sock)) return false; 219 220 socklen_t addr_len = dst.nameLen; 221 if (() @trusted { return getpeername(sock, dst.name, &addr_len); } () != 0) 222 return false; 223 dst.cap(addr_len); 224 return true; 225 } 226 227 override void setTCPNoDelay(StreamSocketFD socket, bool enable) 228 @trusted { 229 if (!isValid(socket)) return; 230 231 BOOL eni = enable; 232 setsockopt(socket, IPPROTO_TCP, TCP_NODELAY, &eni, eni.sizeof); 233 } 234 235 override void setKeepAlive(StreamSocketFD socket, bool enable) 236 @trusted { 237 if (!isValid(socket)) return; 238 239 BOOL eni = enable; 240 setsockopt(socket, SOL_SOCKET, SO_KEEPALIVE, &eni, eni.sizeof); 241 } 242 243 override void setKeepAliveParams(StreamSocketFD socket, Duration idle, Duration interval, int probeCount) @trusted 244 { 245 if (!isValid(socket)) return; 246 247 tcp_keepalive opts = tcp_keepalive(1, cast(c_ulong) idle.total!"msecs"(), 248 cast(c_ulong) interval.total!"msecs"); 249 int result = WSAIoctl(socket, SIO_KEEPALIVE_VALS, &opts, cast(DWORD) tcp_keepalive.sizeof, 250 null, 0, null, null, null); 251 if (result != 0) 252 print("WSAIoctl error on SIO_KEEPALIVE_VALS: %d", WSAGetLastError()); 253 } 254 255 override void setUserTimeout(StreamSocketFD socket, Duration timeout) {} 256 257 258 override void read(StreamSocketFD socket, ubyte[] buffer, IOMode mode, IOCallback on_read_finish) 259 { 260 if (!isValid(socket)) { 261 on_read_finish(socket, IOStatus.invalidHandle, 0); 262 return; 263 } 264 265 auto slot = () @trusted { return &m_sockets[socket].streamSocket(); } (); 266 slot.read.buffer = buffer; 267 slot.read.bytesTransferred = 0; 268 slot.read.mode = mode; 269 slot.read.wsabuf[0].len = buffer.length; 270 slot.read.wsabuf[0].buf = () @trusted { return buffer.ptr; } (); 271 272 void resetBuffers() { slot.read.buffer = null; slot.read.wsabuf[0] = WSABUF.init; } 273 274 auto ovl = mode == IOMode.immediate ? null : &slot.read.overlapped.overlapped; 275 DWORD flags = 0; 276 auto handler = &overlappedIOHandler!(onIOReadCompleted, DWORD); 277 auto ret = () @trusted { return WSARecv(socket, &slot.read.wsabuf[0], slot.read.wsabuf.length, null, &flags, ovl, handler); } (); 278 if (ret == SOCKET_ERROR) { 279 auto err = WSAGetLastError(); 280 if (err == WSA_IO_PENDING) { 281 if (mode == IOMode.immediate) { 282 resetBuffers(); 283 on_read_finish(socket, IOStatus.wouldBlock, 0); 284 return; 285 } 286 } else if (err == WSAEWOULDBLOCK && mode == IOMode.immediate) { 287 resetBuffers(); 288 on_read_finish(socket, IOStatus.wouldBlock, 0); 289 return; 290 } else { 291 resetBuffers(); 292 auto st = handleReadError(err, *slot); 293 on_read_finish(socket, st, 0); 294 return; 295 } 296 } 297 slot.read.callback = on_read_finish; 298 addRef(socket); 299 m_core.addWaiter(); 300 } 301 302 private static nothrow 303 void onIOReadCompleted(DWORD dwError, DWORD cbTransferred, OVERLAPPED_CORE* lpOverlapped) 304 { 305 auto slot = () @trusted { return cast(SocketVector.FullField*)lpOverlapped.hEvent; } (); 306 307 if (!slot.streamSocket.read.callback) return; 308 309 void invokeCallback(IOStatus status, size_t nsent) 310 @safe nothrow { 311 slot.common.core.removeWaiter(); 312 auto cb = slot.streamSocket.read.callback; 313 slot.streamSocket.read.callback = null; 314 slot.streamSocket.read.buffer = null; 315 slot.streamSocket.read.wsabuf[0] = WSABUF.init; 316 if (slot.common.driver.releaseRef(cast(StreamSocketFD)slot.common.fd)) 317 cb(cast(StreamSocketFD)slot.common.fd, status, nsent); 318 } 319 320 slot.streamSocket.read.bytesTransferred += cbTransferred; 321 slot.streamSocket.read.buffer = slot.streamSocket.read.buffer[cbTransferred .. $]; 322 323 if (dwError) { 324 auto st = handleReadError(dwError, slot.streamSocket); 325 invokeCallback(st, slot.streamSocket.read.bytesTransferred); 326 return; 327 } 328 329 if (!cbTransferred) { 330 handleReadError(WSAEDISCON, slot.streamSocket); 331 invokeCallback(IOStatus.disconnected, slot.streamSocket.read.bytesTransferred); 332 return; 333 } 334 335 if (slot.streamSocket.read.mode == IOMode.once || !slot.streamSocket.read.buffer.length) { 336 invokeCallback(IOStatus.ok, slot.streamSocket.read.bytesTransferred); 337 return; 338 } 339 340 slot.streamSocket.read.wsabuf[0].len = slot.streamSocket.read.buffer.length; 341 slot.streamSocket.read.wsabuf[0].buf = () @trusted { return cast(ubyte*)slot.streamSocket.read.buffer.ptr; } (); 342 auto ovl = slot.streamSocket.read.mode == IOMode.immediate ? null : &slot.streamSocket.read.overlapped.overlapped; 343 DWORD flags = 0; 344 auto handler = &overlappedIOHandler!(onIOReadCompleted, DWORD); 345 auto ret = () @trusted { return WSARecv(slot.common.fd, &slot.streamSocket.read.wsabuf[0], slot.streamSocket.read.wsabuf.length, null, &flags, ovl, handler); } (); 346 if (ret == SOCKET_ERROR) { 347 auto err = WSAGetLastError(); 348 if (err == WSA_IO_PENDING) { 349 if (slot.streamSocket.read.mode == IOMode.immediate) { 350 invokeCallback(IOStatus.wouldBlock, 0); 351 } 352 } else { 353 auto st = handleReadError(err, slot.streamSocket); 354 invokeCallback(st, slot.streamSocket.read.bytesTransferred); 355 } 356 } 357 } 358 359 private static IOStatus handleReadError(DWORD err, ref StreamSocketSlot slot) 360 @safe nothrow { 361 switch (err) { 362 case 0: return IOStatus.ok; 363 case WSAEDISCON, WSAESHUTDOWN: 364 if (slot.state == ConnectionState.activeClose) 365 slot.state = ConnectionState.closed; 366 else if (slot.state != ConnectionState.closed) 367 slot.state = ConnectionState.passiveClose; 368 return IOStatus.disconnected; 369 case WSAECONNABORTED, WSAECONNRESET, WSAENETRESET, WSAETIMEDOUT: 370 slot.state = ConnectionState.closed; 371 return IOStatus.disconnected; 372 default: return IOStatus.error; 373 } 374 } 375 376 377 override void write(StreamSocketFD socket, const(ubyte)[] buffer, IOMode mode, IOCallback on_write_finish) 378 { 379 if (!isValid(socket)) { 380 on_write_finish(socket, IOStatus.invalidHandle, 0); 381 return; 382 } 383 384 auto slot = () @trusted { return &m_sockets[socket].streamSocket(); } (); 385 slot.write.buffer = buffer; 386 slot.write.bytesTransferred = 0; 387 slot.write.mode = mode; 388 slot.write.wsabuf[0].len = buffer.length; 389 slot.write.wsabuf[0].buf = () @trusted { return cast(ubyte*)buffer.ptr; } (); 390 391 auto ovl = mode == IOMode.immediate ? null : &m_sockets[socket].streamSocket.write.overlapped.overlapped; 392 auto handler = &overlappedIOHandler!(onIOWriteCompleted, DWORD); 393 auto ret = () @trusted { return WSASend(socket, &slot.write.wsabuf[0], slot.write.wsabuf.length, null, 0, ovl, handler); } (); 394 if (ret == SOCKET_ERROR) { 395 auto err = WSAGetLastError(); 396 if (err == WSA_IO_PENDING) { 397 if (mode == IOMode.immediate) { 398 on_write_finish(socket, IOStatus.wouldBlock, 0); 399 return; 400 } 401 } else { 402 auto st = handleWriteError(err, *slot); 403 on_write_finish(socket, st, 0); 404 return; 405 } 406 } 407 m_sockets[socket].streamSocket.write.callback = on_write_finish; 408 addRef(socket); 409 m_core.addWaiter(); 410 } 411 412 private static nothrow 413 void onIOWriteCompleted(DWORD dwError, DWORD cbTransferred, OVERLAPPED_CORE* lpOverlapped) 414 { 415 auto slot = () @trusted { return cast(SocketVector.FullField*)lpOverlapped.hEvent; } (); 416 417 if (!slot.streamSocket.write.callback) return; 418 419 void invokeCallback(IOStatus status, size_t nsent) 420 @safe nothrow { 421 slot.common.core.removeWaiter(); 422 auto cb = slot.streamSocket.write.callback; 423 slot.streamSocket.write.callback = null; 424 if (slot.common.driver.releaseRef(cast(StreamSocketFD)slot.common.fd)) 425 cb(cast(StreamSocketFD)slot.common.fd, status, nsent); 426 } 427 428 slot.streamSocket.write.bytesTransferred += cbTransferred; 429 slot.streamSocket.write.buffer = slot.streamSocket.write.buffer[cbTransferred .. $]; 430 431 if (dwError) { 432 auto st = handleWriteError(dwError, slot.streamSocket); 433 invokeCallback(st, slot.streamSocket.write.bytesTransferred); 434 return; 435 } 436 437 if (slot.streamSocket.write.mode == IOMode.once || !slot.streamSocket.write.buffer.length) { 438 invokeCallback(IOStatus.ok, slot.streamSocket.write.bytesTransferred); 439 return; 440 } 441 442 slot.streamSocket.write.wsabuf[0].len = slot.streamSocket.write.buffer.length; 443 slot.streamSocket.write.wsabuf[0].buf = () @trusted { return cast(ubyte*)slot.streamSocket.write.buffer.ptr; } (); 444 auto ovl = slot.streamSocket.write.mode == IOMode.immediate ? null : &slot.streamSocket.write.overlapped.overlapped; 445 auto handler = &overlappedIOHandler!(onIOWriteCompleted, DWORD); 446 auto ret = () @trusted { return WSASend(slot.common.fd, &slot.streamSocket.write.wsabuf[0], slot.streamSocket.write.wsabuf.length, null, 0, ovl, handler); } (); 447 if (ret == SOCKET_ERROR) { 448 auto err = WSAGetLastError(); 449 if (err == WSA_IO_PENDING) { 450 if (slot.streamSocket.write.mode == IOMode.immediate) { 451 invokeCallback(IOStatus.wouldBlock, 0); 452 } 453 } else { 454 auto st = handleWriteError(err, slot.streamSocket); 455 invokeCallback(st, slot.streamSocket.write.bytesTransferred); 456 } 457 } 458 } 459 460 private static IOStatus handleWriteError(DWORD err, ref StreamSocketSlot slot) 461 @safe nothrow { 462 switch (err) { 463 case 0: return IOStatus.ok; 464 case WSAEDISCON, WSAESHUTDOWN: 465 if (slot.state == ConnectionState.passiveClose) 466 slot.state = ConnectionState.closed; 467 else if (slot.state != ConnectionState.closed) 468 slot.state = ConnectionState.activeClose; 469 return IOStatus.disconnected; 470 case WSAECONNABORTED, WSAECONNRESET, WSAENETRESET, WSAETIMEDOUT: 471 slot.state = ConnectionState.closed; 472 return IOStatus.disconnected; 473 default: return IOStatus.error; 474 } 475 } 476 477 478 override void waitForData(StreamSocketFD socket, IOCallback on_data_available) 479 { 480 if (!isValid(socket)) { 481 on_data_available(socket, IOStatus.invalidHandle, 0); 482 return; 483 } 484 485 assert(false, "TODO!"); 486 } 487 488 override void shutdown(StreamSocketFD socket, bool shut_read = true, bool shut_write = true) 489 { 490 if (!isValid(socket)) return; 491 492 () @trusted { WSASendDisconnect(socket, null); } (); 493 with (m_sockets[socket].streamSocket) { 494 if (state == ConnectionState.passiveClose) 495 state = ConnectionState.closed; 496 else state = ConnectionState.activeClose; 497 } 498 } 499 500 override void cancelRead(StreamSocketFD socket) 501 @trusted @nogc { 502 if (!isValid(socket)) return; 503 if (!m_sockets[socket].streamSocket.read.callback) return; 504 CancelIoEx(cast(HANDLE)cast(SOCKET)socket, cast(LPOVERLAPPED)&m_sockets[socket].streamSocket.read.overlapped); 505 m_sockets[socket].streamSocket.read.callback = null; 506 m_core.removeWaiter(); 507 } 508 509 override void cancelWrite(StreamSocketFD socket) 510 @trusted @nogc { 511 if (!isValid(socket)) return; 512 if (!m_sockets[socket].streamSocket.write.callback) return; 513 CancelIoEx(cast(HANDLE)cast(SOCKET)socket, cast(LPOVERLAPPED)&m_sockets[socket].streamSocket.write.overlapped); 514 m_sockets[socket].streamSocket.write.callback = null; 515 m_core.removeWaiter(); 516 } 517 518 final override DatagramSocketFD createDatagramSocket(scope Address bind_address, 519 scope Address target_address, DatagramCreateOptions options = DatagramCreateOptions.init) 520 { 521 auto fd = () @trusted { return WSASocketW(bind_address.addressFamily, SOCK_DGRAM, IPPROTO_UDP, null, 0, WSA_FLAG_OVERLAPPED); } (); 522 if (fd == INVALID_SOCKET) 523 return DatagramSocketFD.invalid; 524 525 void invalidateSocket() @nogc @trusted nothrow { closesocket(fd); fd = INVALID_SOCKET; } 526 527 () @trusted { 528 int tmp_reuse = 1; 529 if (setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &tmp_reuse, tmp_reuse.sizeof) != 0) { 530 invalidateSocket(); 531 return; 532 } 533 534 if (bind(fd, bind_address.name, bind_address.nameLen) != 0) { 535 invalidateSocket(); 536 return; 537 } 538 } (); 539 540 if (fd == INVALID_SOCKET) 541 return DatagramSocketFD.invalid; 542 543 auto sock = adoptDatagramSocketInternal(fd); 544 545 if (target_address !is null) 546 setTargetAddress(sock, target_address); 547 548 return sock; 549 } 550 551 final override DatagramSocketFD adoptDatagramSocket(int socket) 552 { 553 return adoptDatagramSocketInternal(socket); 554 } 555 556 private DatagramSocketFD adoptDatagramSocketInternal(SOCKET socket) 557 { 558 if (m_sockets[socket].common.refCount) // FD already in use? 559 return DatagramSocketFD.invalid; 560 561 void setupOverlapped(ref OVERLAPPED_CORE overlapped) @trusted @nogc nothrow { 562 overlapped.Internal = 0; 563 overlapped.InternalHigh = 0; 564 overlapped.Offset = 0; 565 overlapped.OffsetHigh = 0; 566 overlapped.hEvent = cast(HANDLE)cast(void*)&m_sockets[socket]; 567 overlapped.driver = m_core; 568 } 569 570 auto fd = initSocketSlot!DatagramSocketFD(socket); 571 with (m_sockets[socket]) { 572 specific = DatagramSocketSlot.init; 573 setupOverlapped(datagramSocket.write.overlapped); 574 setupOverlapped(datagramSocket.read.overlapped); 575 } 576 577 //() @trusted { WSAAsyncSelect(socket, m_hwnd, WM_USER_SOCKET, FD_READ|FD_WRITE|FD_CONNECT|FD_CLOSE); } (); 578 579 return fd; 580 } 581 582 final override void setTargetAddress(DatagramSocketFD socket, scope Address target_address) 583 { 584 if (!isValid(socket)) return; 585 586 () @trusted { connect(cast(SOCKET)socket, target_address.name, target_address.nameLen); } (); 587 } 588 589 final override bool setBroadcast(DatagramSocketFD socket, bool enable) 590 { 591 if (!isValid(socket)) return false; 592 593 int tmp_broad = enable; 594 return () @trusted { return setsockopt(cast(SOCKET)socket, SOL_SOCKET, SO_BROADCAST, &tmp_broad, tmp_broad.sizeof); } () == 0; 595 } 596 597 final override bool joinMulticastGroup(DatagramSocketFD socket, scope Address multicast_address, uint interface_index = 0) 598 { 599 import std.socket : AddressFamily; 600 601 if (!isValid(socket)) return false; 602 603 switch (multicast_address.addressFamily) { 604 default: assert(false, "Multicast only supported for IPv4/IPv6 sockets."); 605 case AddressFamily.INET: 606 struct ip_mreq { 607 in_addr imr_multiaddr; /* IP multicast address of group */ 608 in_addr imr_interface; /* local IP address of interface */ 609 } 610 auto addr = () @trusted { return cast(sockaddr_in*)multicast_address.name; } (); 611 ip_mreq mreq; 612 mreq.imr_multiaddr = addr.sin_addr; 613 mreq.imr_interface.s_addr = htonl(interface_index); 614 return () @trusted { return setsockopt(cast(SOCKET)socket, IPPROTO_IP, IP_ADD_MEMBERSHIP, &mreq, ip_mreq.sizeof); } () == 0; 615 case AddressFamily.INET6: 616 struct ipv6_mreq { 617 in6_addr ipv6mr_multiaddr; 618 uint ipv6mr_interface; 619 } 620 auto addr = () @trusted { return cast(sockaddr_in6*)multicast_address.name; } (); 621 ipv6_mreq mreq; 622 mreq.ipv6mr_multiaddr = addr.sin6_addr; 623 mreq.ipv6mr_interface = htonl(interface_index); 624 return () @trusted { return setsockopt(cast(SOCKET)socket, IPPROTO_IP, IPV6_JOIN_GROUP, &mreq, ipv6_mreq.sizeof); } () == 0; 625 } 626 } 627 628 override void receive(DatagramSocketFD socket, ubyte[] buffer, IOMode mode, DatagramIOCallback on_read_finish) 629 { 630 if (!isValid(socket)) { 631 RefAddress addr; 632 on_read_finish(socket, IOStatus.invalidHandle, 0, addr); 633 return; 634 } 635 636 auto slot = () @trusted { return &m_sockets[socket].datagramSocket(); } (); 637 slot.read.buffer = buffer; 638 slot.read.wsabuf[0].buf = () @trusted { return buffer.ptr; } (); 639 slot.read.wsabuf[0].len = buffer.length; 640 slot.read.mode = mode; 641 slot.sourceAddrLen = DatagramSocketSlot.sourceAddr.sizeof; 642 643 auto ovl = &slot.read.overlapped.overlapped; 644 DWORD flags = 0; 645 auto handler = &overlappedIOHandler!(onIOReceiveCompleted, DWORD); 646 auto ret = () @trusted { return WSARecvFrom(socket, &slot.read.wsabuf[0], slot.read.wsabuf.length, null, &flags, cast(SOCKADDR*)&slot.sourceAddr, &slot.sourceAddrLen, ovl, handler); } (); 647 if (ret == SOCKET_ERROR) { 648 auto err = WSAGetLastError(); 649 if (err != WSA_IO_PENDING) { 650 on_read_finish(socket, IOStatus.error, 0, null); 651 return; 652 } 653 } 654 655 if (mode == IOMode.immediate) { 656 () @trusted { CancelIoEx(cast(HANDLE)cast(SOCKET)socket, cast(LPOVERLAPPED)&slot.read.overlapped); } (); 657 on_read_finish(socket, IOStatus.wouldBlock, 0, null); 658 return; 659 } 660 661 slot.read.callback = on_read_finish; 662 addRef(socket); 663 m_core.addWaiter(); 664 } 665 666 override void cancelReceive(DatagramSocketFD socket) 667 @trusted @nogc { 668 if (!m_sockets[socket].datagramSocket.read.callback) return; 669 CancelIoEx(cast(HANDLE)cast(SOCKET)socket, cast(LPOVERLAPPED)&m_sockets[socket].datagramSocket.read.overlapped); 670 m_sockets[socket].datagramSocket.read.callback = null; 671 m_core.removeWaiter(); 672 } 673 674 private static nothrow 675 void onIOReceiveCompleted(DWORD dwError, DWORD cbTransferred, OVERLAPPED_CORE* lpOverlapped) 676 { 677 auto slot = () @trusted { return cast(SocketVector.FullField*)lpOverlapped.hEvent; } (); 678 679 if (!slot.datagramSocket.read.callback) return; 680 681 void invokeCallback(IOStatus status, size_t nsent) 682 @safe nothrow { 683 slot.common.core.removeWaiter(); 684 auto cb = slot.datagramSocket.read.callback; 685 slot.datagramSocket.read.callback = null; 686 slot.datagramSocket.read.buffer = null; 687 slot.datagramSocket.read.wsabuf = WSABUF.init; 688 scope addr = new RefAddress(cast(sockaddr*)&slot.datagramSocket.sourceAddr, slot.datagramSocket.sourceAddrLen); 689 if (slot.common.driver.releaseRef(cast(DatagramSocketFD)slot.common.fd)) 690 cb(cast(DatagramSocketFD)slot.common.fd, status, nsent, status == IOStatus.ok ? addr : null); 691 } 692 693 slot.datagramSocket.read.bytesTransferred += cbTransferred; 694 slot.datagramSocket.read.buffer = slot.datagramSocket.read.buffer[cbTransferred .. $]; 695 696 if (!dwError && (slot.datagramSocket.read.mode != IOMode.all || !slot.datagramSocket.read.buffer.length)) { 697 invokeCallback(IOStatus.ok, cbTransferred); 698 return; 699 } 700 701 if (dwError == WSA_OPERATION_ABORTED && slot.datagramSocket.write.mode == IOMode.immediate) { 702 invokeCallback(IOStatus.wouldBlock, 0); 703 return; 704 } 705 706 if (dwError) { 707 invokeCallback(IOStatus.error, 0); 708 return; 709 } 710 711 slot.datagramSocket.read.wsabuf[0].len = slot.datagramSocket.read.buffer.length; 712 slot.datagramSocket.read.wsabuf[0].buf = () @trusted { return cast(ubyte*)slot.datagramSocket.read.buffer.ptr; } (); 713 auto ovl = slot.datagramSocket.read.mode == IOMode.immediate ? null : &slot.datagramSocket.read.overlapped.overlapped; 714 DWORD flags = 0; 715 auto handler = &overlappedIOHandler!(onIOReceiveCompleted, DWORD); 716 auto ret = () @trusted { return WSARecvFrom(slot.common.fd, &slot.datagramSocket.read.wsabuf[0], slot.datagramSocket.read.wsabuf.length, null, &flags, cast(SOCKADDR*)&slot.datagramSocket.sourceAddr, &slot.datagramSocket.sourceAddrLen, ovl, handler); } (); 717 if (ret == SOCKET_ERROR) { 718 auto err = WSAGetLastError(); 719 if (err == WSA_IO_PENDING) { 720 if (slot.datagramSocket.read.mode == IOMode.immediate) { 721 invokeCallback(IOStatus.wouldBlock, 0); 722 } 723 } else { 724 invokeCallback(IOStatus.error, 0); 725 } 726 } 727 } 728 729 override void send(DatagramSocketFD socket, const(ubyte)[] buffer, IOMode mode, Address target_address, DatagramIOCallback on_write_finish) 730 { 731 if (!isValid(socket)) { 732 RefAddress addr; 733 on_write_finish(socket, IOStatus.invalidHandle, 0, addr); 734 return; 735 } 736 737 auto slot = () @trusted { return &m_sockets[socket].datagramSocket(); } (); 738 slot.write.buffer = buffer; 739 slot.write.wsabuf[0].len = buffer.length; 740 slot.write.wsabuf[0].buf = () @trusted { return cast(ubyte*)buffer.ptr; } (); 741 slot.write.mode = mode; 742 slot.targetAddr = target_address; 743 744 auto ovl = &slot.write.overlapped.overlapped; 745 auto tan = target_address ? target_address.name : null; 746 auto tal = target_address ? target_address.nameLen : 0; 747 auto handler = &overlappedIOHandler!(onIOSendCompleted, DWORD); 748 auto ret = () @trusted { return WSASendTo(socket, &slot.write.wsabuf[0], slot.write.wsabuf.length, null, 0, tan, tal, ovl, handler); } (); 749 750 if (ret != 0) { 751 auto err = WSAGetLastError(); 752 if (err != WSA_IO_PENDING) { 753 on_write_finish(socket, IOStatus.error, 0, null); 754 return; 755 } 756 } 757 758 if (mode == IOMode.immediate) { 759 () @trusted { CancelIoEx(cast(HANDLE)cast(SOCKET)socket, cast(LPOVERLAPPED)&slot.write.overlapped); } (); 760 on_write_finish(socket, IOStatus.wouldBlock, 0, null); 761 return; 762 } 763 764 slot.write.callback = on_write_finish; 765 addRef(socket); 766 m_core.addWaiter(); 767 } 768 769 override void cancelSend(DatagramSocketFD socket) 770 @trusted @nogc { 771 if (!m_sockets[socket].datagramSocket.write.callback) return; 772 CancelIoEx(cast(HANDLE)cast(SOCKET)socket, cast(LPOVERLAPPED)&m_sockets[socket].datagramSocket.write.overlapped); 773 m_sockets[socket].datagramSocket.write.callback = null; 774 m_core.removeWaiter(); 775 } 776 777 private static nothrow 778 void onIOSendCompleted(DWORD dwError, DWORD cbTransferred, OVERLAPPED_CORE* lpOverlapped) 779 { 780 auto slot = () @trusted { return cast(SocketVector.FullField*)lpOverlapped.hEvent; } (); 781 782 if (!slot.datagramSocket.write.callback) return; 783 784 void invokeCallback(IOStatus status, size_t nsent) 785 @safe nothrow { 786 slot.common.core.removeWaiter(); 787 auto cb = slot.datagramSocket.write.callback; 788 auto addr = slot.datagramSocket.targetAddr; 789 slot.datagramSocket.write.callback = null; 790 slot.datagramSocket.targetAddr = null; 791 792 if (slot.common.driver.releaseRef(cast(DatagramSocketFD)slot.common.fd)) { 793 if (addr) { 794 scope raddr = new RefAddress(addr.name, addr.nameLen); 795 cb(cast(DatagramSocketFD)slot.common.fd, status, nsent, raddr); 796 } else { 797 cb(cast(DatagramSocketFD)slot.common.fd, status, nsent, null); 798 } 799 } 800 } 801 802 slot.datagramSocket.write.bytesTransferred += cbTransferred; 803 slot.datagramSocket.write.buffer = slot.datagramSocket.write.buffer[cbTransferred .. $]; 804 805 if (!dwError && (slot.datagramSocket.write.mode != IOMode.all || !slot.datagramSocket.write.buffer.length)) { 806 invokeCallback(IOStatus.ok, cbTransferred); 807 return; 808 } 809 810 if (dwError == WSA_OPERATION_ABORTED && slot.datagramSocket.write.mode == IOMode.immediate) { 811 invokeCallback(IOStatus.wouldBlock, 0); 812 return; 813 } 814 815 if (dwError) { 816 invokeCallback(IOStatus.error, 0); 817 return; 818 } 819 820 slot.datagramSocket.write.wsabuf[0].len = slot.datagramSocket.write.buffer.length; 821 slot.datagramSocket.write.wsabuf[0].buf = () @trusted { return cast(ubyte*)slot.datagramSocket.write.buffer.ptr; } (); 822 auto tan = slot.datagramSocket.targetAddr ? slot.datagramSocket.targetAddr.name : null; 823 auto tal = slot.datagramSocket.targetAddr ? slot.datagramSocket.targetAddr.nameLen : 0; 824 auto ovl = slot.datagramSocket.write.mode == IOMode.immediate ? null : &slot.datagramSocket.write.overlapped.overlapped; 825 auto handler = &overlappedIOHandler!(onIOSendCompleted, DWORD); 826 auto ret = () @trusted { return WSASendTo(slot.common.fd, &slot.datagramSocket.write.wsabuf[0], slot.datagramSocket.write.wsabuf.length, null, 0, tan, tal, ovl, handler); } (); 827 if (ret == SOCKET_ERROR) { 828 auto err = WSAGetLastError(); 829 if (err == WSA_IO_PENDING) { 830 if (slot.datagramSocket.write.mode == IOMode.immediate) { 831 invokeCallback(IOStatus.wouldBlock, 0); 832 } 833 } else { 834 invokeCallback(IOStatus.error, 0); 835 } 836 } 837 } 838 839 override bool isValid(SocketFD handle) 840 const { 841 if (handle.value >= m_sockets.length) return false; 842 return handle.validationCounter == m_sockets[handle].common.validationCounter; 843 } 844 845 override void addRef(SocketFD fd) 846 { 847 if (!isValid(fd)) return; 848 849 assert(m_sockets[fd].common.refCount > 0, "Adding reference to unreferenced socket FD."); 850 m_sockets[fd].common.refCount++; 851 } 852 853 override bool releaseRef(SocketFD fd) 854 @nogc { 855 import taggedalgebraic : hasType; 856 857 if (!isValid(fd)) return true; 858 859 auto slot = () @trusted { return &m_sockets[fd]; } (); 860 nogc_assert(slot.common.refCount > 0, "Releasing reference to unreferenced socket FD."); 861 if (--slot.common.refCount == 0) { 862 final switch (slot.specific.kind) with (SocketVector.FieldType) { 863 case Kind.none: break; 864 case Kind.streamSocket: 865 cancelRead(cast(StreamSocketFD)fd); 866 cancelWrite(cast(StreamSocketFD)fd); 867 m_core.discardEvents(&slot.streamSocket.read.overlapped, &slot.streamSocket.write.overlapped); 868 break; 869 case Kind.streamListen: 870 if (m_sockets[fd].streamListen.acceptCallback) 871 m_core.removeWaiter(); 872 break; 873 case Kind.datagramSocket: 874 cancelReceive(cast(DatagramSocketFD)fd); 875 cancelSend(cast(DatagramSocketFD)fd); 876 m_core.discardEvents(&slot.datagramSocket.read.overlapped, &slot.datagramSocket.write.overlapped); 877 break; 878 } 879 880 clearSocketSlot(fd); 881 () @trusted { closesocket(fd); } (); 882 return false; 883 } 884 return true; 885 } 886 887 final override bool setOption(DatagramSocketFD socket, DatagramSocketOption option, bool enable) 888 { 889 if (!isValid(socket)) return false; 890 891 int proto, opt; 892 final switch (option) { 893 case DatagramSocketOption.broadcast: proto = SOL_SOCKET; opt = SO_BROADCAST; break; 894 case DatagramSocketOption.multicastLoopback: proto = IPPROTO_IP; opt = IP_MULTICAST_LOOP; break; 895 } 896 int tmp = enable; 897 return () @trusted { return setsockopt(cast(SOCKET)socket, proto, opt, &tmp, tmp.sizeof); } () == 0; 898 } 899 900 final override bool setOption(StreamSocketFD socket, StreamSocketOption option, bool enable) 901 { 902 if (!isValid(socket)) return false; 903 904 int proto, opt; 905 final switch (option) { 906 case StreamSocketOption.noDelay: proto = IPPROTO_TCP; opt = TCP_NODELAY; break; 907 case StreamSocketOption.keepAlive: proto = SOL_SOCKET; opt = SO_KEEPALIVE; break; 908 } 909 int tmp = enable; 910 return () @trusted { return setsockopt(cast(SOCKET)socket, proto, opt, &tmp, tmp.sizeof); } () == 0; 911 } 912 913 final protected override void* rawUserData(StreamSocketFD descriptor, size_t size, DataInitializer initialize, DataInitializer destroy) 914 @system { 915 return rawUserDataImpl(descriptor, size, initialize, destroy); 916 } 917 918 final protected override void* rawUserData(StreamListenSocketFD descriptor, size_t size, DataInitializer initialize, DataInitializer destroy) 919 @system { 920 return rawUserDataImpl(descriptor, size, initialize, destroy); 921 } 922 923 final protected override void* rawUserData(DatagramSocketFD descriptor, size_t size, DataInitializer initialize, DataInitializer destroy) 924 @system { 925 return rawUserDataImpl(descriptor, size, initialize, destroy); 926 } 927 928 private void* rawUserDataImpl(SocketFD descriptor, size_t size, DataInitializer initialize, DataInitializer destroy) 929 @system @nogc { 930 if (!isValid(descriptor)) return null; 931 932 SocketSlot* fds = &m_sockets[descriptor].common; 933 assert(fds.userDataDestructor is null || fds.userDataDestructor is destroy, 934 "Requesting user data with differing type (destructor)."); 935 assert(size <= SocketSlot.userData.length, "Requested user data is too large."); 936 if (size > SocketSlot.userData.length) assert(false); 937 if (!fds.userDataDestructor) { 938 initialize(fds.userData.ptr); 939 fds.userDataDestructor = destroy; 940 } 941 return fds.userData.ptr; 942 } 943 944 private FDType initSocketSlot(FDType)(SOCKET socket) 945 { 946 m_socketCount++; 947 m_sockets[socket].common.refCount = 1; 948 auto vc = ++m_sockets[socket].common.validationCounter; 949 auto fd = FDType(socket, vc); 950 m_sockets[socket].common.fd = fd; 951 m_sockets[socket].common.driver = this; 952 953 return fd; 954 } 955 956 package void clearSocketSlot(FD fd) 957 @nogc { 958 m_socketCount--; 959 auto slot = () @trusted { return &m_sockets[fd]; } (); 960 if (slot.common.userDataDestructor) 961 () @trusted { slot.common.userDataDestructor(slot.common.userData.ptr); } (); 962 *slot = m_sockets.FullField.init; 963 } 964 965 private static nothrow extern(System) 966 LRESULT onMessage(HWND wnd, UINT msg, WPARAM wparam, LPARAM lparam) 967 { 968 auto driver = () @trusted { return cast(WinAPIEventDriverSockets)cast(void*)GetWindowLongPtrA(wnd, GWLP_USERDATA); } (); 969 switch (msg) { 970 default: break; 971 case WM_USER_SOCKET: 972 SOCKET sock = cast(SOCKET)wparam; 973 auto evt = () @trusted { return LOWORD(lparam); } (); 974 auto err = () @trusted { return HIWORD(lparam); } (); 975 auto slot = () @trusted { return &driver.m_sockets[sock]; } (); 976 final switch (slot.specific.kind) with (SocketVector.FieldType) { 977 case Kind.none: break; 978 case Kind.streamSocket: 979 switch (evt) { 980 default: break; 981 case FD_CONNECT: 982 auto cb = slot.streamSocket.connectCallback; 983 if (!cb) break; // cancelled connect? 984 985 auto fd = StreamSocketFD(sock, slot.common.validationCounter); 986 987 slot.streamSocket.connectCallback = null; 988 slot.common.driver.m_core.removeWaiter(); 989 if (err) { 990 slot.streamSocket.state = ConnectionState.closed; 991 cb(fd, ConnectStatus.refused); 992 } else { 993 slot.streamSocket.state = ConnectionState.connected; 994 cb(fd, ConnectStatus.connected); 995 } 996 break; 997 case FD_READ: 998 break; 999 case FD_WRITE: 1000 break; 1001 } 1002 break; 1003 case Kind.streamListen: 1004 if (evt == FD_ACCEPT) { 1005 /* 1006 sock_t sockfd; 1007 sockaddr_storage addr; 1008 socklen_t addr_len = addr.sizeof; 1009 () @trusted { sockfd = accept(cast(sock_t)listenfd, () @trusted { return cast(sockaddr*)&addr; } (), &addr_len); } (); 1010 if (sockfd == -1) break; 1011 1012 setSocketNonBlocking(cast(SocketFD)sockfd); 1013 auto fd = cast(StreamSocketFD)sockfd; 1014 initSocketSlot(fd); 1015 m_sockets[fd].specific = StreamSocketSlot.init; 1016 m_sockets[fd].streamSocket.state = ConnectionState.connected; 1017 m_loop.registerFD(fd, EventMask.read|EventMask.write|EventMask.status); 1018 m_loop.setNotifyCallback!(EventType.status)(fd, &onConnectError); 1019 releaseRef(fd); // setNotifyCallback adds a reference, but waiting for status/disconnect should not affect the ref count 1020 //print("accept %d", sockfd); 1021 scope RefAddress addrc = new RefAddress(() @trusted { return cast(sockaddr*)&addr; } (), addr_len); 1022 m_sockets[listenfd].streamListen.acceptCallback(cast(StreamListenSocketFD)listenfd, fd, addrc); 1023 */ 1024 SOCKADDR_STORAGE addr; 1025 socklen_t addr_len = addr.sizeof; 1026 auto clientsockfd = () @trusted { return WSAAccept(sock, cast(sockaddr*)&addr, &addr_len, null, 0); } (); 1027 if (clientsockfd == INVALID_SOCKET) return 0; 1028 auto clientsock = driver.adoptStreamInternal(clientsockfd, ConnectionState.connected); 1029 scope RefAddress addrc = new RefAddress(() @trusted { return cast(sockaddr*)&addr; } (), addr_len); 1030 auto fd = StreamListenSocketFD(sock, slot.common.validationCounter); 1031 slot.streamListen.acceptCallback(fd, clientsock, addrc); 1032 } 1033 break; 1034 case Kind.datagramSocket: 1035 break; 1036 } 1037 return 0; 1038 } 1039 return () @trusted { return DefWindowProcA(wnd, msg, wparam, lparam); } (); 1040 } 1041 } 1042 1043 void setupWindowClass() 1044 @trusted nothrow @nogc { 1045 static __gshared registered = false; 1046 1047 if (registered) return; 1048 1049 WNDCLASSA wc; 1050 wc.lpfnWndProc = &WinAPIEventDriverSockets.onMessage; 1051 wc.lpszClassName = "VibeWin32MessageWindow"; 1052 RegisterClassA(&wc); 1053 registered = true; 1054 } 1055 1056 static struct SocketSlot { 1057 SocketFD fd; // redundant, but needed by the current IO Completion Routines based approach 1058 WinAPIEventDriverSockets driver; // redundant, but needed by the current IO Completion Routines based approach 1059 @property inout(WinAPIEventDriverCore) core() @safe nothrow inout { return driver.m_core; } 1060 int refCount; 1061 uint validationCounter; 1062 DataInitializer userDataDestructor; 1063 ubyte[16*size_t.sizeof] userData; 1064 } 1065 1066 private struct StreamSocketSlot { 1067 alias Handle = StreamSocketFD; 1068 StreamDirection!true write; 1069 StreamDirection!false read; 1070 ConnectCallback connectCallback; 1071 ConnectionState state; 1072 } 1073 1074 static struct StreamDirection(bool RO) { 1075 OVERLAPPED_CORE overlapped; 1076 static if (RO) const(ubyte)[] buffer; 1077 else ubyte[] buffer; 1078 WSABUF[1] wsabuf; 1079 size_t bytesTransferred; 1080 IOMode mode; 1081 IOCallback callback; 1082 1083 void reset() { buffer = null; wsabuf[0] = WSABUF.init; callback = null; } 1084 } 1085 1086 private struct StreamListenSocketSlot { 1087 alias Handle = StreamListenSocketFD; 1088 AcceptCallback acceptCallback; 1089 } 1090 1091 private struct DatagramSocketSlot { 1092 alias Handle = DatagramSocketFD; 1093 DgramDirection!true write; 1094 DgramDirection!false read; 1095 Address targetAddr; 1096 SOCKADDR_STORAGE sourceAddr; 1097 INT sourceAddrLen; 1098 } 1099 1100 static struct DgramDirection(bool RO) { 1101 OVERLAPPED_CORE overlapped; 1102 static if (RO) const(ubyte)[] buffer; 1103 else ubyte[] buffer; 1104 WSABUF[1] wsabuf; 1105 size_t bytesTransferred; 1106 IOMode mode; 1107 DatagramIOCallback callback; 1108 1109 void reset() { buffer = null; wsabuf[0] = WSABUF.init; callback = null; } 1110 }