1 module eventcore.drivers.posix.pipes; 2 @safe: 3 4 import eventcore.driver; 5 import eventcore.drivers.posix.driver; 6 import eventcore.internal.utils : nogc_assert, print; 7 8 import std.algorithm : min, max; 9 10 11 final class PosixEventDriverPipes(Loop : PosixEventLoop) : EventDriverPipes { 12 @safe: /*@nogc:*/ nothrow: 13 import core.stdc.errno : errno, EAGAIN; 14 import core.sys.posix.unistd : close, read, write; 15 import core.sys.posix.fcntl; 16 import core.sys.posix.poll; 17 18 private Loop m_loop; 19 20 this(Loop loop) 21 @nogc { 22 m_loop = loop; 23 } 24 25 final override PipeFD adopt(int system_fd) 26 { 27 if (m_loop.m_fds[system_fd].common.refCount) // FD already in use? 28 return PipeFD.invalid; 29 30 // Suprisingly cannot use O_CLOEXEC here, so use FD_CLOEXEC instead. 31 () @trusted { fcntl(system_fd, F_SETFL, fcntl(system_fd, F_GETFL) | O_NONBLOCK | FD_CLOEXEC); } (); 32 33 auto fd = m_loop.initFD!PipeFD(system_fd, FDFlags.none, PipeSlot.init); 34 m_loop.registerFD(fd, EventMask.read|EventMask.write|EventMask.status); 35 return fd; 36 } 37 38 final override void read(PipeFD pipe, ubyte[] buffer, IOMode mode, PipeIOCallback on_read_finish) 39 { 40 if (!isValid(pipe)) { 41 on_read_finish(pipe, IOStatus.invalidHandle, 0); 42 return; 43 } 44 45 auto ret = () @trusted { return read(cast(int)pipe, buffer.ptr, min(buffer.length, int.max)); } (); 46 47 // Read failed 48 if (ret < 0) { 49 auto err = errno; 50 if (err != EAGAIN) { 51 print("Pipe error %s!", err); 52 on_read_finish(pipe, IOStatus.error, 0); 53 return; 54 } 55 } 56 57 // EOF 58 if (ret == 0 && buffer.length > 0) { 59 on_read_finish(pipe, IOStatus.disconnected, 0); 60 return; 61 } 62 63 // Handle immediate mode 64 if (ret < 0 && mode == IOMode.immediate) { 65 on_read_finish(pipe, IOStatus.wouldBlock, 0); 66 return; 67 } 68 69 // Handle successful read 70 if (ret >= 0) { 71 buffer = buffer[ret .. $]; 72 73 // Handle completed read 74 if (mode != IOMode.all || buffer.length == 0) { 75 on_read_finish(pipe, IOStatus.ok, ret); 76 return; 77 } 78 } 79 80 auto slot = () @trusted { return &m_loop.m_fds[pipe].pipe(); } (); 81 assert(slot.readCallback is null, "Concurrent reads are not allowed."); 82 slot.readCallback = on_read_finish; 83 slot.readMode = mode; 84 slot.bytesRead = max(ret, 0); 85 slot.readBuffer = buffer; 86 87 // Need to use EventType.status as well, as pipes don't otherwise notify 88 // of closes 89 m_loop.setNotifyCallback!(EventType.read)(pipe, &onPipeRead); 90 m_loop.setNotifyCallback!(EventType.status)(pipe, &onPipeRead); 91 } 92 93 private void onPipeRead(FD fd) 94 { 95 auto slot = () @trusted { return &m_loop.m_fds[fd].pipe(); } (); 96 auto pipe = cast(PipeFD)fd; 97 98 void finalize(IOStatus status) 99 { 100 addRef(pipe); 101 scope(exit) releaseRef(pipe); 102 103 m_loop.setNotifyCallback!(EventType.read)(pipe, null); 104 m_loop.setNotifyCallback!(EventType.status)(pipe, null); 105 auto cb = slot.readCallback; 106 slot.readCallback = null; 107 slot.readBuffer = null; 108 cb(pipe, status, slot.bytesRead); 109 } 110 111 ssize_t ret = () @trusted { return read(cast(int)pipe, slot.readBuffer.ptr, min(slot.readBuffer.length, int.max)); } (); 112 113 // Read failed 114 if (ret < 0) { 115 auto err = errno; 116 if (err != EAGAIN) { 117 print("Pipe error %s!", err); 118 finalize(IOStatus.error); 119 return; 120 } 121 } 122 123 // EOF 124 if (ret == 0 && slot.readBuffer.length > 0) { 125 finalize(IOStatus.disconnected); 126 return; 127 } 128 129 // Successful read 130 if (ret > 0 || !slot.readBuffer.length) { 131 slot.readBuffer = slot.readBuffer[ret .. $]; 132 slot.bytesRead += ret; 133 134 // Handle completed read 135 if (slot.readMode != IOMode.all || slot.readBuffer.length == 0) { 136 finalize(IOStatus.ok); 137 return; 138 } 139 } 140 } 141 142 final override void cancelRead(PipeFD pipe) 143 { 144 if (!isValid(pipe)) return; 145 146 auto slot = () @trusted { return &m_loop.m_fds[pipe].pipe(); } (); 147 assert(slot.readCallback !is null, "Cancelling read when there is no read in progress."); 148 m_loop.setNotifyCallback!(EventType.read)(pipe, null); 149 slot.readBuffer = null; 150 } 151 152 final override void write(PipeFD pipe, const(ubyte)[] buffer, IOMode mode, PipeIOCallback on_write_finish) 153 { 154 if (!isValid(pipe)) { 155 on_write_finish(pipe, IOStatus.invalidHandle, 0); 156 return; 157 } 158 159 if (buffer.length == 0) { 160 on_write_finish(pipe, IOStatus.ok, 0); 161 return; 162 } 163 164 ssize_t ret = () @trusted { return write(cast(int)pipe, buffer.ptr, min(buffer.length, int.max)); } (); 165 166 if (ret < 0) { 167 auto err = errno; 168 if (err != EAGAIN) { 169 on_write_finish(pipe, IOStatus.error, 0); 170 return; 171 } 172 173 if (mode == IOMode.immediate) { 174 on_write_finish(pipe, IOStatus.wouldBlock, 0); 175 return; 176 } 177 } else { 178 buffer = buffer[ret .. $]; 179 180 if (mode != IOMode.all || buffer.length == 0) { 181 on_write_finish(pipe, IOStatus.ok, ret); 182 return; 183 } 184 } 185 186 auto slot = () @trusted { return &m_loop.m_fds[pipe].pipe(); } (); 187 assert(slot.writeCallback is null, "Concurrent writes not allowed."); 188 slot.writeCallback = on_write_finish; 189 slot.writeMode = mode; 190 slot.bytesWritten = max(ret, 0); 191 slot.writeBuffer = buffer; 192 193 m_loop.setNotifyCallback!(EventType.write)(pipe, &onPipeWrite); 194 } 195 196 private void onPipeWrite(FD fd) 197 { 198 auto slot = () @trusted { return &m_loop.m_fds[fd].pipe(); } (); 199 auto pipe = cast(PipeFD)fd; 200 201 void finalize(IOStatus status) 202 { 203 addRef(pipe); 204 scope(exit) releaseRef(pipe); 205 206 m_loop.setNotifyCallback!(EventType.write)(pipe, null); 207 auto cb = slot.writeCallback; 208 slot.writeCallback = null; 209 slot.writeBuffer = null; 210 cb(pipe, status, slot.bytesWritten); 211 } 212 213 ssize_t ret = () @trusted { return write(cast(int)pipe, slot.writeBuffer.ptr, min(slot.writeBuffer.length, int.max)); } (); 214 215 if (ret < 0) { 216 auto err = errno; 217 if (err != EAGAIN) { 218 finalize(IOStatus.error); 219 } 220 } else { 221 slot.bytesWritten += ret; 222 slot.writeBuffer = slot.writeBuffer[ret .. $]; 223 224 if (slot.writeMode != IOMode.all || slot.writeBuffer.length == 0) { 225 finalize(IOStatus.ok); 226 } 227 } 228 229 } 230 231 final override void cancelWrite(PipeFD pipe) 232 { 233 if (!isValid(pipe)) return; 234 235 auto slot = () @trusted { return &m_loop.m_fds[pipe].pipe(); } (); 236 assert(slot.writeCallback !is null, "Cancelling write when there is no write in progress."); 237 m_loop.setNotifyCallback!(EventType.write)(pipe, null); 238 slot.writeCallback = null; 239 slot.writeBuffer = null; 240 } 241 242 final override void waitForData(PipeFD pipe, PipeIOCallback on_data_available) 243 { 244 if (!isValid(pipe)) { 245 on_data_available(pipe, IOStatus.invalidHandle, 0); 246 return; 247 } 248 249 if (pollPipe(pipe, on_data_available)) 250 { 251 return; 252 } 253 254 auto slot = () @trusted { return &m_loop.m_fds[pipe].pipe(); } (); 255 256 assert(slot.readCallback is null, "Concurrent reads are not allowed."); 257 slot.readCallback = on_data_available; 258 slot.readMode = IOMode.once; // currently meaningless 259 slot.bytesRead = 0; // currently meaningless 260 slot.readBuffer = null; 261 m_loop.setNotifyCallback!(EventType.read)(pipe, &onPipeDataAvailable); 262 } 263 264 private void onPipeDataAvailable(FD fd) 265 { 266 auto slot = () @trusted { return &m_loop.m_fds[fd].pipe(); } (); 267 auto pipe = cast(PipeFD)fd; 268 269 auto callback = (PipeFD f, IOStatus s, size_t m) { 270 addRef(f); 271 scope(exit) releaseRef(f); 272 273 auto cb = slot.readCallback; 274 slot.readCallback = null; 275 slot.readBuffer = null; 276 cb(f, s, m); 277 }; 278 279 if (pollPipe(pipe, callback)) 280 { 281 m_loop.setNotifyCallback!(EventType.read)(pipe, null); 282 } 283 } 284 285 private bool pollPipe(PipeFD pipe, PipeIOCallback callback) 286 @trusted { 287 // Use poll to check if any data is available 288 pollfd pfd; 289 pfd.fd = cast(int)pipe; 290 pfd.events = POLLIN; 291 int ret = poll(&pfd, 1, 0); 292 293 if (ret == -1) { 294 print("Error polling pipe: %s!", errno); 295 callback(pipe, IOStatus.error, 0); 296 return true; 297 } 298 299 if (ret == 1) { 300 callback(pipe, IOStatus.error, 0); 301 return true; 302 } 303 304 return false; 305 } 306 307 final override void close(PipeFD pipe, PipeCloseCallback on_closed) 308 { 309 if (!isValid(pipe)) { 310 on_closed(pipe, CloseStatus.invalidHandle); 311 return; 312 } 313 314 int res = close(cast(int)pipe); 315 m_loop.unregisterFD(pipe, EventMask.read|EventMask.write|EventMask.status); 316 m_loop.clearFD!PipeSlot(pipe); 317 318 if (on_closed) 319 on_closed(pipe, res == 0 ? CloseStatus.ok : CloseStatus.ioError); 320 } 321 322 override bool isValid(PipeFD handle) 323 const { 324 if (handle.value >= m_loop.m_fds.length) return false; 325 return m_loop.m_fds[handle.value].common.validationCounter == handle.validationCounter; 326 } 327 328 final override void addRef(PipeFD pipe) 329 { 330 if (!isValid(pipe)) return; 331 332 auto slot = () @trusted { return &m_loop.m_fds[pipe]; } (); 333 assert(slot.common.refCount > 0, "Adding reference to unreferenced pipe FD."); 334 slot.common.refCount++; 335 } 336 337 final override bool releaseRef(PipeFD pipe) 338 { 339 import taggedalgebraic : hasType; 340 341 if (!isValid(pipe)) return true; 342 343 auto slot = () @trusted { return &m_loop.m_fds[pipe]; } (); 344 nogc_assert(slot.common.refCount > 0, "Releasing reference to unreferenced pipe FD."); 345 346 if (--slot.common.refCount == 0) { 347 close(pipe, null); 348 return false; 349 } 350 return true; 351 } 352 353 final protected override void* rawUserData(PipeFD fd, size_t size, DataInitializer initialize, DataInitializer destroy) 354 @system { 355 return m_loop.rawUserDataImpl(fd, size, initialize, destroy); 356 } 357 } 358 359 final class DummyEventDriverPipes(Loop : PosixEventLoop) : EventDriverPipes { 360 @safe: /*@nogc:*/ nothrow: 361 this(Loop loop) {} 362 363 override PipeFD adopt(int system_pipe_handle) 364 { 365 assert(false, "TODO!"); 366 } 367 368 override void read(PipeFD pipe, ubyte[] buffer, IOMode mode, PipeIOCallback on_read_finish) 369 { 370 assert(false, "TODO!"); 371 } 372 373 override void cancelRead(PipeFD pipe) 374 { 375 assert(false, "TODO!"); 376 } 377 378 override void write(PipeFD pipe, const(ubyte)[] buffer, IOMode mode, PipeIOCallback on_write_finish) 379 { 380 assert(false, "TODO!"); 381 } 382 383 override void cancelWrite(PipeFD pipe) 384 { 385 assert(false, "TODO!"); 386 } 387 388 override void waitForData(PipeFD pipe, PipeIOCallback on_data_available) 389 { 390 assert(false, "TODO!"); 391 } 392 393 override void close(PipeFD pipe, PipeCloseCallback on_closed) 394 { 395 if (!isValid(pipe)) { 396 on_closed(pipe, CloseStatus.invalidHandle); 397 return; 398 } 399 400 assert(false, "TODO!"); 401 } 402 403 override bool isValid(PipeFD handle) 404 const { 405 return false; 406 } 407 408 override void addRef(PipeFD pid) 409 { 410 assert(false, "TODO!"); 411 } 412 413 override bool releaseRef(PipeFD pid) 414 { 415 assert(false, "TODO!"); 416 } 417 418 protected override void* rawUserData(PipeFD descriptor, size_t size, DataInitializer initialize, DataInitializer destroy) 419 @system { 420 assert(false, "TODO!"); 421 } 422 } 423 424 425 package struct PipeSlot { 426 alias Handle = PipeFD; 427 428 size_t bytesRead; 429 ubyte[] readBuffer; 430 IOMode readMode; 431 PipeIOCallback readCallback; 432 433 size_t bytesWritten; 434 const(ubyte)[] writeBuffer; 435 IOMode writeMode; 436 PipeIOCallback writeCallback; 437 }