1 module eventcore.drivers.posix.sockets; 2 @safe: 3 4 import eventcore.driver; 5 import eventcore.drivers.posix.driver; 6 import eventcore.internal.utils; 7 8 import std.algorithm.comparison : among, min, max; 9 import std.socket : Address, AddressFamily, InternetAddress, Internet6Address, UnknownAddress; 10 11 import core.time: Duration; 12 13 version (Posix) { 14 import std.socket : UnixAddress; 15 import core.sys.posix.netdb : AI_ADDRCONFIG, AI_V4MAPPED, addrinfo, freeaddrinfo, getaddrinfo; 16 import core.sys.posix.netinet.in_; 17 import core.sys.posix.netinet.tcp; 18 import core.sys.posix.sys.un; 19 import core.sys.posix.unistd : close, read, write; 20 import core.stdc.errno; 21 import core.sys.posix.fcntl; 22 import core.sys.posix.sys.socket; 23 24 version (linux) enum SO_REUSEPORT = 15; 25 else enum SO_REUSEPORT = 0x200; 26 27 static if (!is(typeof(O_CLOEXEC))) 28 { 29 version (linux) enum O_CLOEXEC = 0x80000; 30 else version (FreeBSD) enum O_CLOEXEC = 0x100000; 31 else version (Solaris) enum O_CLOEXEC = 0x800000; 32 else version (DragonFlyBSD) enum O_CLOEXEC = 0x0020000; 33 else version (NetBSD) enum O_CLOEXEC = 0x400000; 34 else version (OpenBSD) enum O_CLOEXEC = 0x10000; 35 else version (OSX) enum O_CLOEXEC = 0x1000000; 36 } 37 } 38 version (linux) { 39 extern (C) int accept4(int sockfd, sockaddr *addr, socklen_t *addrlen, int flags) nothrow @nogc; 40 static if (!is(typeof(SOCK_NONBLOCK))) 41 enum SOCK_NONBLOCK = 0x800; 42 static if (!is(typeof(SOCK_CLOEXEC))) 43 enum SOCK_CLOEXEC = 0x80000; 44 45 import core.sys.linux.netinet.in_ : IP_ADD_MEMBERSHIP, IP_MULTICAST_LOOP; 46 47 // Linux-specific TCP options 48 // https://github.com/torvalds/linux/blob/master/include/uapi/linux/tcp.h#L95 49 // Some day we should siply import core.sys.linux.netinet.tcp; 50 static if (!is(typeof(SOL_TCP))) 51 enum SOL_TCP = 6; 52 static if (!is(typeof(TCP_KEEPIDLE))) 53 enum TCP_KEEPIDLE = 4; 54 static if (!is(typeof(TCP_KEEPINTVL))) 55 enum TCP_KEEPINTVL = 5; 56 static if (!is(typeof(TCP_KEEPCNT))) 57 enum TCP_KEEPCNT = 6; 58 static if (!is(typeof(TCP_USER_TIMEOUT))) 59 enum TCP_USER_TIMEOUT = 18; 60 } 61 version (OSX) { 62 import core.sys.darwin.netinet.in_ : IP_ADD_MEMBERSHIP, IP_MULTICAST_LOOP; 63 64 static if (!is(typeof(ESHUTDOWN))) enum ESHUTDOWN = 58; 65 } 66 version (FreeBSD) { 67 import core.sys.freebsd.netinet.in_ : IP_ADD_MEMBERSHIP, IP_MULTICAST_LOOP; 68 } 69 version (DragonFlyBSD) { 70 import core.sys.dragonflybsd.netinet.in_ : IP_ADD_MEMBERSHIP, IP_MULTICAST_LOOP; 71 } 72 version (Solaris) { 73 enum IP_ADD_MEMBERSHIP = 0x13; 74 enum IP_MULTICAST_LOOP = 0x12; 75 } 76 version (Windows) { 77 import core.sys.windows.windows; 78 import core.sys.windows.winsock2; 79 alias sockaddr_storage = SOCKADDR_STORAGE; 80 81 alias EAGAIN = WSAEWOULDBLOCK; 82 alias ECONNREFUSED = WSAECONNREFUSED; 83 alias EPIPE = WSAECONNABORTED; 84 alias ECONNRESET = WSAECONNRESET; 85 alias ENETRESET = WSAENETRESET; 86 alias ENOTCONN = WSAENOTCONN; 87 alias ETIMEDOUT = WSAETIMEDOUT; 88 alias ESHUTDOWN = WSAESHUTDOWN; 89 90 enum SHUT_RDWR = SD_BOTH; 91 enum SHUT_RD = SD_RECEIVE; 92 enum SHUT_WR = SD_SEND; 93 94 extern (C) int read(int fd, void *buffer, uint count) nothrow; 95 extern (C) int write(int fd, const(void) *buffer, uint count) nothrow; 96 extern (C) int close(int fd) nothrow @safe; 97 } 98 99 version (Android) { 100 static if (!is(typeof(MSG_NOSIGNAL))) 101 enum MSG_NOSIGNAL = 0x4000; 102 } 103 104 version (Posix) { 105 version (OSX) { 106 enum SEND_FLAGS = 0; 107 } else { 108 enum SEND_FLAGS = MSG_NOSIGNAL; 109 } 110 } else { 111 enum SEND_FLAGS = 0; 112 } 113 114 115 final class PosixEventDriverSockets(Loop : PosixEventLoop) : EventDriverSockets { 116 @safe: /*@nogc:*/ nothrow: 117 private Loop m_loop; 118 119 this(Loop loop) { m_loop = loop; } 120 121 final override StreamSocketFD connectStream(scope Address address, scope Address bind_address, ConnectCallback on_connect) 122 { 123 assert(on_connect !is null); 124 125 // @trusted to escape DIP1000's `scope` check 126 auto sockfd = () @trusted { return createSocket(address.addressFamily, SOCK_STREAM); }(); 127 if (sockfd == -1) { 128 on_connect(StreamSocketFD.invalid, ConnectStatus.socketCreateFailure); 129 return StreamSocketFD.invalid; 130 } 131 132 int bret; 133 if (bind_address !is null) 134 () @trusted { bret = bind(sockfd, bind_address.name, bind_address.nameLen); } (); 135 136 if (bret != 0) { 137 closeSocket(sockfd); 138 on_connect(StreamSocketFD.invalid, ConnectStatus.bindFailure); 139 return StreamSocketFD.invalid; 140 } 141 142 auto sock = m_loop.initFD!StreamSocketFD(sockfd, FDFlags.none, StreamSocketSlot.init); 143 m_loop.registerFD(sock, EventMask.read|EventMask.write); 144 145 auto ret = () @trusted { return connect(cast(sock_t)sock, address.name, address.nameLen); } (); 146 if (ret == 0) { 147 m_loop.m_fds[sock].streamSocket.state = ConnectionState.connected; 148 on_connect(sock, ConnectStatus.connected); 149 } else { 150 auto err = getSocketError(); 151 if (err.among!(EAGAIN, EINPROGRESS)) { 152 with (m_loop.m_fds[sock].streamSocket) { 153 connectCallback = on_connect; 154 state = ConnectionState.connecting; 155 } 156 m_loop.setNotifyCallback!(EventType.write)(sock, &onConnect); 157 } else { 158 m_loop.unregisterFD(sock, EventMask.read|EventMask.write); 159 m_loop.clearFD!StreamSocketSlot(sock); 160 closeSocket(sockfd); 161 on_connect(StreamSocketFD.invalid, determineConnectStatus(err)); 162 return StreamSocketFD.invalid; 163 } 164 } 165 166 return sock; 167 } 168 169 final override void cancelConnectStream(StreamSocketFD sock) 170 { 171 if (!isValid(sock)) return; 172 173 with (m_loop.m_fds[sock].streamSocket) 174 { 175 assert(state == ConnectionState.connecting, 176 "Unable to cancel connect on the socket that is not in connecting state"); 177 state = ConnectionState.closed; 178 connectCallback = null; 179 m_loop.setNotifyCallback!(EventType.write)(sock, null); 180 } 181 } 182 183 final override StreamSocketFD adoptStream(int socket) 184 { 185 if (m_loop.m_fds[socket].common.refCount) // FD already in use? 186 return StreamSocketFD.invalid; 187 setSocketNonBlocking(socket); 188 auto fd = m_loop.initFD!StreamSocketFD(socket, FDFlags.none, StreamSocketSlot.init); 189 m_loop.registerFD(fd, EventMask.read|EventMask.write); 190 return fd; 191 } 192 193 private void onConnect(FD fd) 194 { 195 auto sock = cast(StreamSocketFD)fd; 196 auto l = lockHandle(sock); 197 m_loop.setNotifyCallback!(EventType.write)(sock, null); 198 199 ConnectStatus status = ConnectStatus.unknownError; 200 int err; 201 socklen_t errlen = err.sizeof; 202 if (() @trusted { return getsockopt(cast(sock_t)fd, SOL_SOCKET, SO_ERROR, &err, &errlen); } () == 0) 203 status = determineConnectStatus(err); 204 205 with (m_loop.m_fds[sock].streamSocket) { 206 assert(state == ConnectionState.connecting); 207 208 state = status == ConnectStatus.connected 209 ? ConnectionState.connected 210 : ConnectionState.closed; 211 212 auto cb = connectCallback; 213 connectCallback = null; 214 if (cb) cb(cast(StreamSocketFD)sock, status); 215 } 216 } 217 218 private ConnectStatus determineConnectStatus(int sock_err) 219 { 220 switch (sock_err) { 221 default: return ConnectStatus.unknownError; 222 case 0: return ConnectStatus.connected; 223 case ECONNREFUSED: return ConnectStatus.refused; 224 } 225 } 226 227 alias listenStream = EventDriverSockets.listenStream; 228 final override StreamListenSocketFD listenStream(scope Address address, StreamListenOptions options, AcceptCallback on_accept) 229 { 230 // @trusted to escape DIP1000's `scope` check 231 auto sockfd = () @trusted { return createSocket(address.addressFamily, SOCK_STREAM); }(); 232 if (sockfd == -1) return StreamListenSocketFD.invalid; 233 234 auto succ = () @trusted { 235 int tmp_reuse = 1; 236 // FIXME: error handling! 237 if (options & StreamListenOptions.reuseAddress) { 238 if (setsockopt(sockfd, SOL_SOCKET, SO_REUSEADDR, &tmp_reuse, tmp_reuse.sizeof) != 0) 239 return false; 240 } 241 242 version (Windows) {} 243 else { 244 if ((options & StreamListenOptions.reusePort) && setsockopt(sockfd, SOL_SOCKET, SO_REUSEPORT, &tmp_reuse, tmp_reuse.sizeof) != 0) 245 return false; 246 } 247 248 if (bind(sockfd, address.name, address.nameLen) != 0) 249 return false; 250 251 if (listen(sockfd, getBacklogSize()) != 0) 252 return false; 253 254 return true; 255 } (); 256 257 if (!succ) { 258 closeSocket(sockfd); 259 return StreamListenSocketFD.invalid; 260 } 261 262 auto sock = m_loop.initFD!StreamListenSocketFD(sockfd, FDFlags.none, StreamListenSocketSlot.init); 263 264 if (on_accept) waitForConnections(sock, on_accept); 265 266 return sock; 267 } 268 269 final override void waitForConnections(StreamListenSocketFD sock, AcceptCallback on_accept) 270 { 271 if (!isValid(sock)) return; 272 273 m_loop.registerFD(sock, EventMask.read, false); 274 m_loop.m_fds[sock].streamListen.acceptCallback = on_accept; 275 m_loop.setNotifyCallback!(EventType.read)(sock, &onAccept); 276 onAccept(sock); 277 } 278 279 private void onAccept(FD listenfd) 280 { 281 sock_t sockfd; 282 sockaddr_storage addr; 283 socklen_t addr_len = addr.sizeof; 284 version (linux) { 285 () @trusted { sockfd = accept4(cast(sock_t)listenfd, () @trusted { return cast(sockaddr*)&addr; } (), &addr_len, SOCK_NONBLOCK | SOCK_CLOEXEC); } (); 286 if (sockfd == -1) return; 287 } else { 288 () @trusted { sockfd = accept(cast(sock_t)listenfd, () @trusted { return cast(sockaddr*)&addr; } (), &addr_len); } (); 289 if (sockfd == -1) return; 290 setSocketNonBlocking(sockfd, true); 291 } 292 auto fd = m_loop.initFD!StreamSocketFD(sockfd, FDFlags.none, StreamSocketSlot.init); 293 m_loop.m_fds[fd].streamSocket.state = ConnectionState.connected; 294 m_loop.registerFD(fd, EventMask.read|EventMask.write); 295 //print("accept %d", sockfd); 296 scope RefAddress addrc = new RefAddress(() @trusted { return cast(sockaddr*)&addr; } (), addr_len); 297 m_loop.m_fds[listenfd].streamListen.acceptCallback(cast(StreamListenSocketFD)listenfd, fd, addrc); 298 } 299 300 ConnectionState getConnectionState(StreamSocketFD sock) 301 { 302 if (!isValid(sock)) return ConnectionState.closed; 303 return m_loop.m_fds[sock].streamSocket.state; 304 } 305 306 final override bool getLocalAddress(SocketFD sock, scope RefAddress dst) 307 { 308 if (!isValid(sock)) return false; 309 310 socklen_t addr_len = dst.nameLen; 311 if (() @trusted { return getsockname(cast(sock_t)sock, dst.name, &addr_len); } () != 0) 312 return false; 313 dst.cap(addr_len); 314 return true; 315 } 316 317 final override bool getRemoteAddress(SocketFD sock, scope RefAddress dst) 318 { 319 if (!isValid(sock)) return false; 320 321 socklen_t addr_len = dst.nameLen; 322 if (() @trusted { return getpeername(cast(sock_t)sock, dst.name, &addr_len); } () != 0) 323 return false; 324 dst.cap(addr_len); 325 return true; 326 } 327 328 final override void setTCPNoDelay(StreamSocketFD socket, bool enable) 329 { 330 if (!isValid(socket)) return; 331 332 int opt = enable; 333 () @trusted { setsockopt(cast(sock_t)socket, IPPROTO_TCP, TCP_NODELAY, cast(char*)&opt, opt.sizeof); } (); 334 } 335 336 override void setKeepAlive(StreamSocketFD socket, bool enable) @trusted 337 { 338 if (!isValid(socket)) return; 339 340 int opt = enable; 341 int err = setsockopt(cast(sock_t)socket, SOL_SOCKET, SO_KEEPALIVE, &opt, int.sizeof); 342 if (err != 0) 343 print("sock error in setKeepAlive: %s", getSocketError); 344 } 345 346 override void setKeepAliveParams(StreamSocketFD socket, Duration idle, Duration interval, int probeCount) @trusted 347 { 348 if (!isValid(socket)) return; 349 350 // dunnno about BSD\OSX, maybe someone should fix it for them later 351 version (linux) { 352 setKeepAlive(socket, true); 353 int int_opt = cast(int) idle.total!"seconds"(); 354 int err = setsockopt(cast(sock_t)socket, SOL_TCP, TCP_KEEPIDLE, &int_opt, int.sizeof); 355 if (err != 0) { 356 print("sock error on setsockopt TCP_KEEPIDLE: %s", getSocketError); 357 return; 358 } 359 int_opt = cast(int) interval.total!"seconds"(); 360 err = setsockopt(cast(sock_t)socket, SOL_TCP, TCP_KEEPINTVL, &int_opt, int.sizeof); 361 if (err != 0) { 362 print("sock error on setsockopt TCP_KEEPINTVL: %s", getSocketError); 363 return; 364 } 365 err = setsockopt(cast(sock_t)socket, SOL_TCP, TCP_KEEPCNT, &probeCount, int.sizeof); 366 if (err != 0) 367 print("sock error on setsockopt TCP_KEEPCNT: %s", getSocketError); 368 } 369 } 370 371 override void setUserTimeout(StreamSocketFD socket, Duration timeout) @trusted 372 { 373 if (!isValid(socket)) return; 374 375 version (linux) { 376 uint tmsecs = cast(uint) timeout.total!"msecs"; 377 int err = setsockopt(cast(sock_t)socket, SOL_TCP, TCP_USER_TIMEOUT, &tmsecs, uint.sizeof); 378 if (err != 0) 379 print("sock error on setsockopt TCP_USER_TIMEOUT %s", getSocketError); 380 } 381 } 382 383 final override void read(StreamSocketFD socket, ubyte[] buffer, IOMode mode, IOCallback on_read_finish) 384 { 385 if (!isValid(socket)) { 386 on_read_finish(socket, IOStatus.invalidHandle, 0); 387 return; 388 } 389 390 /*if (buffer.length == 0) { 391 on_read_finish(socket, IOStatus.ok, 0); 392 return; 393 }*/ 394 395 sizediff_t ret; 396 () @trusted { ret = .recv(cast(sock_t)socket, buffer.ptr, min(buffer.length, int.max), 0); } (); 397 398 if (ret < 0) { 399 auto err = getSocketError(); 400 if (err.among!(EAGAIN, EINPROGRESS)) { 401 if (mode == IOMode.immediate) { 402 on_read_finish(socket, IOStatus.wouldBlock, 0); 403 return; 404 } 405 } else { 406 auto st = handleReadError(err, m_loop.m_fds[socket].streamSocket); 407 print("sock error %s!", err); 408 on_read_finish(socket, st, 0); 409 return; 410 } 411 } 412 413 if (ret == 0 && buffer.length > 0) { 414 // treat as if the connection read end was shut down 415 handleReadError(ESHUTDOWN, m_loop.m_fds[socket].streamSocket); 416 on_read_finish(socket, IOStatus.disconnected, 0); 417 return; 418 } 419 420 if (ret >= 0) { 421 buffer = buffer[ret .. $]; 422 if (mode != IOMode.all || buffer.length == 0) { 423 on_read_finish(socket, IOStatus.ok, ret); 424 return; 425 } 426 } 427 428 // NOTE: since we know that not all data was read from the stream 429 // socket, the next call to recv is guaranteed to return EGAIN 430 // and we can avoid that call. 431 432 with (m_loop.m_fds[socket].streamSocket) { 433 readCallback = on_read_finish; 434 readMode = mode; 435 bytesRead = ret > 0 ? ret : 0; 436 readBuffer = buffer; 437 } 438 439 m_loop.setNotifyCallback!(EventType.read)(socket, &onSocketRead); 440 } 441 442 override void cancelRead(StreamSocketFD socket) 443 { 444 if (!isValid(socket)) return; 445 446 assert(m_loop.m_fds[socket].streamSocket.readCallback !is null, "Cancelling read when there is no read in progress."); 447 m_loop.setNotifyCallback!(EventType.read)(socket, null); 448 with (m_loop.m_fds[socket].streamSocket) { 449 readBuffer = null; 450 } 451 } 452 453 private void onSocketRead(FD fd) 454 { 455 auto slot = () @trusted { return &m_loop.m_fds[fd].streamSocket(); } (); 456 auto socket = cast(StreamSocketFD)fd; 457 458 void finalize()(IOStatus status) 459 { 460 auto l = lockHandle(socket); 461 m_loop.setNotifyCallback!(EventType.read)(socket, null); 462 assert(m_loop.m_fds[socket].common.refCount > 0); 463 //m_fds[fd].readBuffer = null; 464 slot.readCallback(socket, status, slot.bytesRead); 465 assert(m_loop.m_fds[socket].common.refCount > 0); 466 } 467 468 while (true) { 469 sizediff_t ret = 0; 470 () @trusted { ret = .recv(cast(sock_t)socket, slot.readBuffer.ptr, min(slot.readBuffer.length, int.max), 0); } (); 471 if (ret < 0) { 472 auto err = getSocketError(); 473 if (!err.among!(EAGAIN, EINPROGRESS)) { 474 auto st = handleReadError(err, *slot); 475 finalize(st); 476 return; 477 } 478 } 479 480 if (ret == 0 && slot.readBuffer.length) { 481 // treat as if the connection read end was shut down 482 handleReadError(ESHUTDOWN, m_loop.m_fds[socket].streamSocket); 483 finalize(IOStatus.disconnected); 484 return; 485 } 486 487 if (ret > 0 || !slot.readBuffer.length) { 488 slot.bytesRead += ret; 489 slot.readBuffer = slot.readBuffer[ret .. $]; 490 if (slot.readMode != IOMode.all || slot.readBuffer.length == 0) { 491 finalize(IOStatus.ok); 492 return; 493 } 494 } 495 496 // retry if this was just a partial read, as it could mean that 497 // the connection was closed by the remove peer 498 if (ret <= 0 || !slot.readBuffer.length) break; 499 } 500 } 501 502 private static IOStatus handleReadError(int err, ref StreamSocketSlot slot) 503 @safe nothrow { 504 switch (err) { 505 case 0: return IOStatus.ok; 506 case EPIPE, ECONNRESET, ENETRESET, ENOTCONN, ETIMEDOUT: 507 slot.state = ConnectionState.closed; 508 return IOStatus.disconnected; 509 case ESHUTDOWN: 510 if (slot.state == ConnectionState.activeClose) 511 slot.state = ConnectionState.closed; 512 else if (slot.state != ConnectionState.closed) 513 slot.state = ConnectionState.passiveClose; 514 return IOStatus.disconnected; 515 default: return IOStatus.error; 516 } 517 } 518 519 520 final override void write(StreamSocketFD socket, const(ubyte)[] buffer, IOMode mode, IOCallback on_write_finish) 521 { 522 if (!isValid(socket)) { 523 on_write_finish(socket, IOStatus.invalidHandle, 0); 524 return; 525 } 526 527 if (buffer.length == 0) { 528 on_write_finish(socket, IOStatus.ok, 0); 529 return; 530 } 531 532 sizediff_t ret; 533 () @trusted { ret = .send(cast(sock_t)socket, buffer.ptr, min(buffer.length, int.max), SEND_FLAGS); } (); 534 535 if (ret < 0) { 536 auto err = getSocketError(); 537 if (err.among!(EAGAIN, EINPROGRESS)) { 538 if (mode == IOMode.immediate) { 539 on_write_finish(socket, IOStatus.wouldBlock, 0); 540 return; 541 } 542 } else { 543 auto st = handleWriteError(err, m_loop.m_fds[socket].streamSocket); 544 on_write_finish(socket, st, 0); 545 return; 546 } 547 } 548 549 size_t bytes_written = 0; 550 551 if (ret >= 0) { 552 bytes_written += ret; 553 buffer = buffer[ret .. $]; 554 if (mode != IOMode.all || buffer.length == 0) { 555 on_write_finish(socket, IOStatus.ok, bytes_written); 556 return; 557 } 558 } 559 560 // NOTE: since we know that not all data was writtem to the stream 561 // socket, the next call to send is guaranteed to return EGAIN 562 // and we can avoid that call. 563 564 with (m_loop.m_fds[socket].streamSocket) { 565 writeCallback = on_write_finish; 566 writeMode = mode; 567 bytesWritten = ret >= 0 ? ret : 0; 568 writeBuffer = buffer; 569 } 570 571 m_loop.setNotifyCallback!(EventType.write)(socket, &onSocketWrite); 572 } 573 574 override void cancelWrite(StreamSocketFD socket) 575 { 576 if (!isValid(socket)) return; 577 578 assert(m_loop.m_fds[socket].streamSocket.writeCallback !is null, "Cancelling write when there is no write in progress."); 579 m_loop.setNotifyCallback!(EventType.write)(socket, null); 580 m_loop.m_fds[socket].streamSocket.writeBuffer = null; 581 } 582 583 private void onSocketWrite(FD fd) 584 { 585 auto slot = () @trusted { return &m_loop.m_fds[fd].streamSocket(); } (); 586 auto socket = cast(StreamSocketFD)fd; 587 588 sizediff_t ret; 589 () @trusted { ret = .send(cast(sock_t)socket, slot.writeBuffer.ptr, min(slot.writeBuffer.length, int.max), SEND_FLAGS); } (); 590 591 if (ret < 0) { 592 auto err = getSocketError(); 593 if (!err.among!(EAGAIN, EINPROGRESS)) { 594 auto l = lockHandle(socket); 595 m_loop.setNotifyCallback!(EventType.write)(socket, null); 596 auto st = handleWriteError(err, *slot); 597 slot.writeCallback(socket, st, slot.bytesRead); 598 return; 599 } 600 } 601 602 if (ret >= 0) { 603 slot.bytesWritten += ret; 604 slot.writeBuffer = slot.writeBuffer[ret .. $]; 605 if (slot.writeMode != IOMode.all || slot.writeBuffer.length == 0) { 606 auto l = lockHandle(socket); 607 m_loop.setNotifyCallback!(EventType.write)(socket, null); 608 slot.writeCallback(cast(StreamSocketFD)socket, IOStatus.ok, slot.bytesWritten); 609 return; 610 } 611 } 612 } 613 614 private static IOStatus handleWriteError(int err, ref StreamSocketSlot slot) 615 @safe nothrow { 616 switch (err) { 617 case 0: return IOStatus.ok; 618 case EPIPE, ECONNRESET, ENETRESET, ENOTCONN, ETIMEDOUT: 619 slot.state = ConnectionState.closed; 620 return IOStatus.disconnected; 621 case ESHUTDOWN: 622 if (slot.state == ConnectionState.passiveClose) 623 slot.state = ConnectionState.closed; 624 else if (slot.state != ConnectionState.closed) 625 slot.state = ConnectionState.activeClose; 626 return IOStatus.disconnected; 627 default: return IOStatus.error; 628 } 629 } 630 631 632 final override void waitForData(StreamSocketFD socket, IOCallback on_data_available) 633 { 634 if (!isValid(socket)) { 635 on_data_available(socket, IOStatus.invalidHandle, 0); 636 return; 637 } 638 639 sizediff_t ret; 640 ubyte dummy; 641 () @trusted { ret = recv(cast(sock_t)socket, &dummy, 1, MSG_PEEK); } (); 642 643 if (ret < 0) { 644 auto err = getSocketError(); 645 if (!err.among!(EAGAIN, EINPROGRESS)) { 646 on_data_available(socket, IOStatus.error, 0); 647 return; 648 } 649 } 650 651 size_t bytes_read = 0; 652 653 if (ret == 0) { 654 on_data_available(socket, IOStatus.disconnected, 0); 655 return; 656 } 657 658 if (ret > 0) { 659 on_data_available(socket, IOStatus.ok, 0); 660 return; 661 } 662 663 with (m_loop.m_fds[socket].streamSocket) { 664 readCallback = on_data_available; 665 readMode = IOMode.once; 666 bytesRead = 0; 667 readBuffer = null; 668 } 669 670 m_loop.setNotifyCallback!(EventType.read)(socket, &onSocketDataAvailable); 671 } 672 673 private void onSocketDataAvailable(FD fd) 674 { 675 auto slot = () @trusted { return &m_loop.m_fds[fd].streamSocket(); } (); 676 auto socket = cast(StreamSocketFD)fd; 677 678 void finalize()(IOStatus status) 679 { 680 auto l = lockHandle(socket); 681 m_loop.setNotifyCallback!(EventType.read)(socket, null); 682 //m_fds[fd].readBuffer = null; 683 slot.readCallback(socket, status, 0); 684 } 685 686 sizediff_t ret; 687 ubyte tmp; 688 () @trusted { ret = recv(cast(sock_t)socket, &tmp, 1, MSG_PEEK); } (); 689 if (ret < 0) { 690 auto err = getSocketError(); 691 if (!err.among!(EAGAIN, EINPROGRESS)) finalize(IOStatus.error); 692 } else finalize(ret ? IOStatus.ok : IOStatus.disconnected); 693 } 694 695 final override void shutdown(StreamSocketFD socket, bool shut_read, bool shut_write) 696 { 697 if (!isValid(socket)) return; 698 699 auto st = m_loop.m_fds[socket].streamSocket.state; 700 () @trusted { .shutdown(cast(sock_t)socket, shut_read ? shut_write ? SHUT_RDWR : SHUT_RD : shut_write ? SHUT_WR : 0); } (); 701 if (st == ConnectionState.passiveClose) shut_read = true; 702 if (st == ConnectionState.activeClose) shut_write = true; 703 m_loop.m_fds[socket].streamSocket.state = shut_read ? shut_write ? ConnectionState.closed : ConnectionState.passiveClose : shut_write ? ConnectionState.activeClose : ConnectionState.connected; 704 } 705 706 final override DatagramSocketFD createDatagramSocket(scope Address bind_address, 707 scope Address target_address, DatagramCreateOptions options = DatagramCreateOptions.init) 708 { 709 return createDatagramSocketInternal(bind_address, target_address, options, false); 710 } 711 712 package DatagramSocketFD createDatagramSocketInternal(scope Address bind_address, 713 scope Address target_address, DatagramCreateOptions options = DatagramCreateOptions.init, 714 bool is_internal = true) 715 { 716 // @trusted to escape DIP1000's `scope` check 717 auto sockfd = () @trusted { return createSocket(bind_address.addressFamily, SOCK_DGRAM); }(); 718 if (sockfd == -1) return DatagramSocketFD.invalid; 719 720 auto optsucc = () @trusted { 721 int tmp_reuse = 1; 722 // FIXME: error handling! 723 if (options & DatagramCreateOptions.reuseAddress) { 724 if (setsockopt(sockfd, SOL_SOCKET, SO_REUSEADDR, &tmp_reuse, tmp_reuse.sizeof) != 0) 725 return false; 726 } 727 728 version (Windows) {} 729 else { 730 if ((options & DatagramCreateOptions.reusePort) && setsockopt(sockfd, SOL_SOCKET, SO_REUSEPORT, &tmp_reuse, tmp_reuse.sizeof) != 0) 731 return false; 732 } 733 734 return true; 735 } (); 736 737 if (!optsucc) { 738 closeSocket(sockfd); 739 return DatagramSocketFD.init; 740 } 741 742 743 if (bind_address && () @trusted { return bind(sockfd, bind_address.name, bind_address.nameLen); } () != 0) { 744 closeSocket(sockfd); 745 return DatagramSocketFD.init; 746 } 747 748 if (target_address) { 749 int ret; 750 if (target_address is bind_address) { 751 // special case of bind_address==target_address: determine the actual bind address 752 // in case of a zero port 753 sockaddr_storage sa; 754 socklen_t addr_len = sa.sizeof; 755 if (() @trusted { return getsockname(sockfd, cast(sockaddr*)&sa, &addr_len); } () != 0) { 756 closeSocket(sockfd); 757 return DatagramSocketFD.init; 758 } 759 760 ret = () @trusted { return connect(sockfd, cast(sockaddr*)&sa, addr_len); } (); 761 } else ret = () @trusted { return connect(sockfd, target_address.name, target_address.nameLen); } (); 762 763 if (ret != 0) { 764 closeSocket(sockfd); 765 return DatagramSocketFD.init; 766 } 767 } 768 769 auto flags = is_internal ? FDFlags.internal : FDFlags.none; 770 auto sock = m_loop.initFD!DatagramSocketFD(sockfd, flags, DgramSocketSlot.init); 771 m_loop.registerFD(sock, EventMask.read|EventMask.write); 772 return sock; 773 } 774 775 final override DatagramSocketFD adoptDatagramSocket(int socket) 776 { 777 return adoptDatagramSocketInternal(socket, false); 778 } 779 780 package DatagramSocketFD adoptDatagramSocketInternal(int socket, bool is_internal = true, bool close_on_exec = false) 781 @nogc { 782 if (m_loop.m_fds[socket].common.refCount) // FD already in use? 783 return DatagramSocketFD.init; 784 setSocketNonBlocking(socket, close_on_exec); 785 auto flags = is_internal ? FDFlags.internal : FDFlags.none; 786 auto fd = m_loop.initFD!DatagramSocketFD(socket, flags, DgramSocketSlot.init); 787 m_loop.registerFD(fd, EventMask.read|EventMask.write); 788 return fd; 789 } 790 791 final override void setTargetAddress(DatagramSocketFD socket, scope Address target_address) 792 { 793 if (!isValid(socket)) return; 794 795 () @trusted { connect(cast(sock_t)socket, target_address.name, target_address.nameLen); } (); 796 } 797 798 final override bool setBroadcast(DatagramSocketFD socket, bool enable) 799 { 800 if (!isValid(socket)) return false; 801 802 int tmp_broad = enable; 803 return () @trusted { return setsockopt(cast(sock_t)socket, SOL_SOCKET, SO_BROADCAST, &tmp_broad, tmp_broad.sizeof); } () == 0; 804 } 805 806 final override bool joinMulticastGroup(DatagramSocketFD socket, scope Address multicast_address, uint interface_index = 0) 807 { 808 if (!isValid(socket)) return false; 809 810 // @trusted to escape DIP1000's `scope` check 811 switch (() @trusted { return multicast_address.addressFamily; }()) { 812 default: assert(false, "Multicast only supported for IPv4/IPv6 sockets."); 813 case AddressFamily.INET: 814 struct ip_mreq { 815 in_addr imr_multiaddr; /* IP multicast address of group */ 816 in_addr imr_interface; /* local IP address of interface */ 817 } 818 auto addr = () @trusted { return cast(sockaddr_in*)multicast_address.name; } (); 819 ip_mreq mreq; 820 mreq.imr_multiaddr = addr.sin_addr; 821 mreq.imr_interface.s_addr = htonl(interface_index); 822 return () @trusted { return setsockopt(cast(sock_t)socket, IPPROTO_IP, IP_ADD_MEMBERSHIP, &mreq, ip_mreq.sizeof); } () == 0; 823 case AddressFamily.INET6: 824 version (Windows) { 825 struct ipv6_mreq { 826 in6_addr ipv6mr_multiaddr; 827 uint ipv6mr_interface; 828 } 829 } 830 auto addr = () @trusted { return cast(sockaddr_in6*)multicast_address.name; } (); 831 ipv6_mreq mreq; 832 mreq.ipv6mr_multiaddr = addr.sin6_addr; 833 834 version (Android) { 835 // ipv6mr_interface is defined as ipv6mr_ifindex on android 836 mreq.ipv6mr_ifindex = htonl(interface_index); 837 } else { 838 mreq.ipv6mr_interface = htonl(interface_index); 839 } 840 return () @trusted { return setsockopt(cast(sock_t)socket, IPPROTO_IP, IPV6_JOIN_GROUP, &mreq, ipv6_mreq.sizeof); } () == 0; 841 } 842 } 843 844 void receive(DatagramSocketFD socket, ubyte[] buffer, IOMode mode, DatagramIOCallback on_receive_finish) 845 @safe { 846 assert(mode != IOMode.all, "Only IOMode.immediate and IOMode.once allowed for datagram sockets."); 847 848 if (!isValid(socket)) { 849 RefAddress addr; 850 on_receive_finish(socket, IOStatus.invalidHandle, 0, addr); 851 return; 852 } 853 854 sizediff_t ret; 855 sockaddr_storage src_addr; 856 socklen_t src_addr_len = src_addr.sizeof; 857 () @trusted { ret = .recvfrom(cast(sock_t)socket, buffer.ptr, min(buffer.length, int.max), 0, cast(sockaddr*)&src_addr, &src_addr_len); } (); 858 859 if (ret < 0) { 860 auto err = getSocketError(); 861 if (!err.among!(EAGAIN, EINPROGRESS)) { 862 print("sock error %s for %s!", err, socket); 863 on_receive_finish(socket, IOStatus.error, 0, null); 864 return; 865 } 866 867 if (mode == IOMode.immediate) { 868 on_receive_finish(socket, IOStatus.wouldBlock, 0, null); 869 } else { 870 with (m_loop.m_fds[socket].datagramSocket) { 871 readCallback = on_receive_finish; 872 readMode = mode; 873 bytesRead = 0; 874 readBuffer = buffer; 875 } 876 877 m_loop.setNotifyCallback!(EventType.read)(socket, &onDgramRead); 878 } 879 return; 880 } 881 882 scope src_addrc = new RefAddress(() @trusted { return cast(sockaddr*)&src_addr; } (), src_addr_len); 883 if (ret == 0) { 884 on_receive_finish(socket, IOStatus.disconnected, 0, src_addrc); 885 } else { 886 on_receive_finish(socket, IOStatus.ok, ret, src_addrc); 887 } 888 } 889 890 package void receiveNoGC(DatagramSocketFD socket, ubyte[] buffer, IOMode mode, void delegate(DatagramSocketFD, IOStatus, size_t, scope RefAddress) @safe nothrow @nogc on_receive_finish) 891 @trusted @nogc { 892 scope void delegate() @safe nothrow do_it = { 893 receive(socket, buffer, mode, on_receive_finish); 894 }; 895 (cast(void delegate() @safe nothrow @nogc)do_it)(); 896 } 897 898 void cancelReceive(DatagramSocketFD socket) 899 @nogc { 900 if (!isValid(socket)) return; 901 902 auto slot = () @trusted { return &m_loop.m_fds[socket].datagramSocket(); } (); 903 if (slot.readCallback is null) return; 904 905 m_loop.setNotifyCallback!(EventType.read)(socket, null); 906 slot.readCallback = null; 907 slot.readBuffer = null; 908 } 909 910 private void onDgramRead(FD fd) 911 @safe { 912 auto slot = () @trusted { return &m_loop.m_fds[fd].datagramSocket(); } (); 913 auto socket = cast(DatagramSocketFD)fd; 914 915 sizediff_t ret; 916 sockaddr_storage src_addr; 917 socklen_t src_addr_len = src_addr.sizeof; 918 () @trusted { ret = .recvfrom(cast(sock_t)socket, slot.readBuffer.ptr, min(slot.readBuffer.length, int.max), 0, cast(sockaddr*)&src_addr, &src_addr_len); } (); 919 920 if (ret < 0) { 921 auto err = getSocketError(); 922 if (!err.among!(EAGAIN, EINPROGRESS)) { 923 auto l = lockHandle(socket); 924 m_loop.setNotifyCallback!(EventType.read)(socket, null); 925 slot.readCallback(socket, IOStatus.error, 0, null); 926 return; 927 } 928 } 929 930 auto l = lockHandle(socket); 931 m_loop.setNotifyCallback!(EventType.read)(socket, null); 932 scope src_addrc = new RefAddress(() @trusted { return cast(sockaddr*)&src_addr; } (), src_addr.sizeof); 933 auto cb = () @trusted { return cast(DatagramIOCallback)slot.readCallback; } (); 934 slot.readCallback = null; 935 cb(socket, IOStatus.ok, ret, src_addrc); 936 } 937 938 void send(DatagramSocketFD socket, const(ubyte)[] buffer, IOMode mode, Address target_address, DatagramIOCallback on_send_finish) 939 { 940 assert(mode != IOMode.all, "Only IOMode.immediate and IOMode.once allowed for datagram sockets."); 941 942 if (!isValid(socket)) { 943 RefAddress addr; 944 on_send_finish(socket, IOStatus.invalidHandle, 0, addr); 945 return; 946 } 947 948 sizediff_t ret; 949 if (target_address) { 950 () @trusted { ret = .sendto(cast(sock_t)socket, buffer.ptr, min(buffer.length, int.max), SEND_FLAGS, target_address.name, target_address.nameLen); } (); 951 m_loop.m_fds[socket].datagramSocket.targetAddr = target_address; 952 } else { 953 () @trusted { ret = .send(cast(sock_t)socket, buffer.ptr, min(buffer.length, int.max), SEND_FLAGS); } (); 954 } 955 956 if (ret < 0) { 957 auto err = getSocketError(); 958 if (!err.among!(EAGAIN, EINPROGRESS)) { 959 print("sock error %s!", err); 960 on_send_finish(socket, IOStatus.error, 0, null); 961 return; 962 } 963 964 if (mode == IOMode.immediate) { 965 on_send_finish(socket, IOStatus.wouldBlock, 0, null); 966 } else { 967 with (m_loop.m_fds[socket].datagramSocket) { 968 writeCallback = on_send_finish; 969 writeMode = mode; 970 bytesWritten = 0; 971 writeBuffer = buffer; 972 } 973 974 m_loop.setNotifyCallback!(EventType.write)(socket, &onDgramWrite); 975 } 976 return; 977 } 978 979 on_send_finish(socket, IOStatus.ok, ret, null); 980 } 981 982 void cancelSend(DatagramSocketFD socket) 983 { 984 if (!isValid(socket)) return; 985 986 assert(m_loop.m_fds[socket].datagramSocket.writeCallback !is null, "Cancelling write when there is no write in progress."); 987 m_loop.setNotifyCallback!(EventType.write)(socket, null); 988 m_loop.m_fds[socket].datagramSocket.writeBuffer = null; 989 } 990 991 private void onDgramWrite(FD fd) 992 { 993 auto slot = () @trusted { return &m_loop.m_fds[fd].datagramSocket(); } (); 994 auto socket = cast(DatagramSocketFD)fd; 995 996 sizediff_t ret; 997 if (slot.targetAddr) { 998 () @trusted { ret = .sendto(cast(sock_t)socket, slot.writeBuffer.ptr, min(slot.writeBuffer.length, int.max), SEND_FLAGS, slot.targetAddr.name, slot.targetAddr.nameLen); } (); 999 } else { 1000 () @trusted { ret = .send(cast(sock_t)socket, slot.writeBuffer.ptr, min(slot.writeBuffer.length, int.max), SEND_FLAGS); } (); 1001 } 1002 1003 if (ret < 0) { 1004 auto err = getSocketError(); 1005 if (!err.among!(EAGAIN, EINPROGRESS)) { 1006 auto l = lockHandle(socket); 1007 m_loop.setNotifyCallback!(EventType.write)(socket, null); 1008 () @trusted { return cast(DatagramIOCallback)slot.writeCallback; } ()(socket, IOStatus.error, 0, null); 1009 return; 1010 } 1011 } 1012 1013 auto l = lockHandle(socket); 1014 m_loop.setNotifyCallback!(EventType.write)(socket, null); 1015 () @trusted { return cast(DatagramIOCallback)slot.writeCallback; } ()(socket, IOStatus.ok, ret, null); 1016 } 1017 1018 final override bool isValid(SocketFD handle) 1019 const { 1020 if (handle.value > m_loop.m_fds.length) return false; 1021 return m_loop.m_fds[handle.value].common.validationCounter == handle.validationCounter; 1022 } 1023 1024 final override void addRef(SocketFD fd) 1025 { 1026 if (!isValid(fd)) return; 1027 1028 auto slot = () @trusted { return &m_loop.m_fds[fd]; } (); 1029 assert(slot.common.refCount > 0, "Adding reference to unreferenced socket FD."); 1030 slot.common.refCount++; 1031 } 1032 1033 final override bool releaseRef(SocketFD fd) 1034 @nogc { 1035 import taggedalgebraic : hasType; 1036 1037 if (!isValid(fd)) return true; 1038 1039 auto slot = () @trusted { return &m_loop.m_fds[fd]; } (); 1040 nogc_assert(slot.common.refCount > 0, "Releasing reference to unreferenced socket FD."); 1041 // listening sockets have an incremented the reference count because of setNotifyCallback 1042 int base_refcount = slot.specific.hasType!StreamListenSocketSlot ? 1 : 0; 1043 if (--slot.common.refCount == base_refcount) { 1044 m_loop.unregisterFD(fd, EventMask.read|EventMask.write); 1045 switch (slot.specific.kind) with (slot.specific.Kind) { 1046 default: assert(false, "File descriptor slot is not a socket."); 1047 case streamSocket: 1048 m_loop.clearFD!StreamSocketSlot(fd); 1049 break; 1050 case streamListen: 1051 m_loop.setNotifyCallback!(EventType.read)(fd, null); 1052 m_loop.clearFD!StreamListenSocketSlot(fd); 1053 break; 1054 case datagramSocket: 1055 m_loop.clearFD!DgramSocketSlot(fd); 1056 break; 1057 } 1058 closeSocket(cast(sock_t)fd); 1059 return false; 1060 } 1061 return true; 1062 } 1063 1064 final override bool setOption(DatagramSocketFD socket, DatagramSocketOption option, bool enable) 1065 { 1066 if (!isValid(socket)) return false; 1067 1068 int proto, opt; 1069 final switch (option) { 1070 case DatagramSocketOption.broadcast: proto = SOL_SOCKET; opt = SO_BROADCAST; break; 1071 case DatagramSocketOption.multicastLoopback: proto = IPPROTO_IP; opt = IP_MULTICAST_LOOP; break; 1072 } 1073 int tmp = enable; 1074 return () @trusted { return setsockopt(cast(sock_t)socket, proto, opt, &tmp, tmp.sizeof); } () == 0; 1075 } 1076 1077 final override bool setOption(StreamSocketFD socket, StreamSocketOption option, bool enable) 1078 { 1079 if (!isValid(socket)) return false; 1080 1081 int proto, opt; 1082 final switch (option) { 1083 case StreamSocketOption.noDelay: proto = IPPROTO_TCP; opt = TCP_NODELAY; break; 1084 case StreamSocketOption.keepAlive: proto = SOL_SOCKET; opt = SO_KEEPALIVE; break; 1085 } 1086 int tmp = enable; 1087 return () @trusted { return setsockopt(cast(sock_t)socket, proto, opt, &tmp, tmp.sizeof); } () == 0; 1088 } 1089 1090 final protected override void* rawUserData(StreamSocketFD descriptor, size_t size, DataInitializer initialize, DataInitializer destroy) 1091 @system { 1092 if (!isValid(descriptor)) return null; 1093 return m_loop.rawUserDataImpl(descriptor, size, initialize, destroy); 1094 } 1095 1096 final protected override void* rawUserData(StreamListenSocketFD descriptor, size_t size, DataInitializer initialize, DataInitializer destroy) 1097 @system { 1098 if (!isValid(descriptor)) return null; 1099 return m_loop.rawUserDataImpl(descriptor, size, initialize, destroy); 1100 } 1101 1102 final protected override void* rawUserData(DatagramSocketFD descriptor, size_t size, DataInitializer initialize, DataInitializer destroy) 1103 @system { 1104 if (!isValid(descriptor)) return null; 1105 return m_loop.rawUserDataImpl(descriptor, size, initialize, destroy); 1106 } 1107 1108 private sock_t createSocket(AddressFamily family, int type) 1109 { 1110 sock_t sock; 1111 version (linux) { 1112 () @trusted { sock = socket(family, type | SOCK_NONBLOCK | SOCK_CLOEXEC, 0); } (); 1113 if (sock == -1) return -1; 1114 } else { 1115 () @trusted { sock = socket(family, type, 0); } (); 1116 if (sock == -1) return -1; 1117 setSocketNonBlocking(sock, true); 1118 1119 // Prevent SIGPIPE on failed send 1120 version (OSX) { 1121 int val = 1; 1122 () @trusted { setsockopt(sock, SOL_SOCKET, SO_NOSIGPIPE, &val, val.sizeof); } (); 1123 } 1124 } 1125 return sock; 1126 } 1127 1128 // keeps a scoped reference to a handle to avoid it getting destroyed 1129 private auto lockHandle(H)(H handle) 1130 { 1131 addRef(handle); 1132 static struct R { 1133 PosixEventDriverSockets drv; 1134 H handle; 1135 @disable this(this); 1136 ~this() { drv.releaseRef(handle); } 1137 } 1138 return R(this, handle); 1139 } 1140 } 1141 1142 package struct StreamSocketSlot { 1143 alias Handle = StreamSocketFD; 1144 1145 size_t bytesRead; 1146 ubyte[] readBuffer; 1147 IOMode readMode; 1148 IOCallback readCallback; // FIXME: this type only works for stream sockets 1149 1150 size_t bytesWritten; 1151 const(ubyte)[] writeBuffer; 1152 IOMode writeMode; 1153 IOCallback writeCallback; // FIXME: this type only works for stream sockets 1154 1155 ConnectCallback connectCallback; 1156 ConnectionState state; 1157 } 1158 1159 package struct StreamListenSocketSlot { 1160 alias Handle = StreamListenSocketFD; 1161 1162 AcceptCallback acceptCallback; 1163 } 1164 1165 package struct DgramSocketSlot { 1166 alias Handle = DatagramSocketFD; 1167 size_t bytesRead; 1168 ubyte[] readBuffer; 1169 IOMode readMode; 1170 DatagramIOCallback readCallback; // FIXME: this type only works for stream sockets 1171 1172 size_t bytesWritten; 1173 const(ubyte)[] writeBuffer; 1174 IOMode writeMode; 1175 DatagramIOCallback writeCallback; // FIXME: this type only works for stream sockets 1176 Address targetAddr; 1177 } 1178 1179 private void closeSocket(sock_t sockfd) 1180 @nogc nothrow { 1181 version (Windows) () @trusted { closesocket(sockfd); } (); 1182 else close(sockfd); 1183 } 1184 1185 private void setSocketNonBlocking(SocketFD.BaseType sockfd, bool close_on_exec = false) 1186 @nogc nothrow { 1187 version (Windows) { 1188 uint enable = 1; 1189 () @trusted { ioctlsocket(sockfd, FIONBIO, &enable); } (); 1190 } else { 1191 int f = O_NONBLOCK; 1192 if (close_on_exec) f |= O_CLOEXEC; 1193 () @trusted { fcntl(cast(int)sockfd, F_SETFL, f); } (); 1194 } 1195 } 1196 1197 private int getSocketError() 1198 @nogc nothrow { 1199 version (Windows) return WSAGetLastError(); 1200 else return errno; 1201 } 1202 1203 private int getBacklogSize() 1204 @trusted @nogc nothrow { 1205 int backlog = 128; 1206 version (linux) 1207 { 1208 import core.stdc.stdio : fclose, fopen, fscanf; 1209 auto somaxconn = fopen("/proc/sys/net/core/somaxconn", "re"); 1210 if (somaxconn) 1211 { 1212 int tmp; 1213 if (fscanf(somaxconn, "%d", &tmp) == 1) 1214 backlog = tmp; 1215 fclose(somaxconn); 1216 } 1217 } 1218 return backlog; 1219 }