1 module eventcore.drivers.threadedfile; 2 3 import eventcore.driver; 4 import eventcore.internal.ioworker; 5 import eventcore.internal.utils; 6 import core.atomic; 7 import core.stdc.errno; 8 import std.algorithm.comparison : among, min; 9 10 version (Posix) { 11 import core.sys.posix.fcntl; 12 import core.sys.posix.sys.stat; 13 import core.sys.posix.unistd; 14 } 15 version (Windows) { 16 import core.sys.windows.stat; 17 18 private { 19 // TODO: use CreateFile/HANDLE instead of the Posix API on Windows 20 21 extern(C) nothrow { 22 alias off_t = sizediff_t; 23 int open(in char* name, int mode, ...); 24 int chmod(in char* name, int mode); 25 int close(int fd) @safe; 26 int read(int fd, void *buffer, uint count); 27 int write(int fd, in void *buffer, uint count); 28 long _lseeki64(int fd, long offset, int origin) @safe; 29 } 30 31 enum O_RDONLY = 0; 32 enum O_WRONLY = 1; 33 enum O_RDWR = 2; 34 enum O_APPEND = 8; 35 enum O_CREAT = 0x0100; 36 enum O_TRUNC = 0x0200; 37 enum O_BINARY = 0x8000; 38 39 enum _S_IREAD = 0x0100; /* read permission, owner */ 40 enum _S_IWRITE = 0x0080; /* write permission, owner */ 41 alias stat_t = struct_stat; 42 } 43 } 44 else 45 { 46 enum O_BINARY = 0; 47 } 48 49 version (darwin) { 50 // NOTE: Always building for 64-bit, so these are identical 51 alias lseek64 = lseek; 52 } 53 54 version (Android) { 55 static if (!is(off64_t)) { 56 alias off64_t = long; 57 extern(C) off64_t lseek64(int, off64_t, int) @safe nothrow; 58 } 59 } 60 61 private { 62 enum SEEK_SET = 0; 63 enum SEEK_CUR = 1; 64 enum SEEK_END = 2; 65 } 66 67 68 final class ThreadedFileEventDriver(Events : EventDriverEvents) : EventDriverFiles 69 { 70 import std.parallelism; 71 72 private { 73 enum ThreadedFileStatus { 74 idle, // -> initiated (by caller) 75 initiated, // -> processing (by worker) 76 processing, // -> cancelling, finished (by caller, worker) 77 cancelling, // -> cancelled (by worker) 78 cancelled, // -> idle (by event receiver) 79 finished // -> idle (by event receiver) 80 } 81 82 static struct IOInfo { 83 FileIOCallback callback; 84 shared ThreadedFileStatus status; 85 shared size_t bytesWritten; 86 shared IOStatus ioStatus; 87 88 void finalize(FileFD fd, scope void delegate() @safe nothrow pre_cb) 89 @safe nothrow { 90 auto st = safeAtomicLoad(this.status); 91 if (st == ThreadedFileStatus.finished) { 92 auto ios = safeAtomicLoad(this.ioStatus); 93 auto btw = safeAtomicLoad(this.bytesWritten); 94 auto cb = this.callback; 95 this.callback = null; 96 safeAtomicStore(this.status, ThreadedFileStatus.idle); 97 pre_cb(); 98 if (cb) { 99 log("fire callback"); 100 cb(fd, ios, btw); 101 } 102 } else if (st == ThreadedFileStatus.cancelled) { 103 this.callback = null; 104 safeAtomicStore(this.status, ThreadedFileStatus.idle); 105 pre_cb(); 106 log("ignore callback due to cancellation"); 107 } 108 } 109 } 110 111 static struct FileInfo { 112 IOInfo read; 113 IOInfo write; 114 115 uint validationCounter; 116 117 int refCount; 118 DataInitializer userDataDestructor; 119 ubyte[16*size_t.sizeof] userData; 120 } 121 122 IOWorkerPool m_fileThreadPool; 123 ChoppedVector!FileInfo m_files; // TODO: use the one from the posix loop 124 SmallIntegerSet!size_t m_activeReads; 125 SmallIntegerSet!size_t m_activeWrites; 126 EventID m_readyEvent = EventID.invalid; 127 bool m_waiting; 128 Events m_events; 129 } 130 131 @safe: nothrow: 132 133 this(Events events) 134 { 135 m_events = events; 136 } 137 138 void dispose() 139 { 140 m_fileThreadPool = IOWorkerPool.init; 141 142 if (m_readyEvent != EventID.invalid) { 143 log("finishing file events"); 144 if (m_waiting) 145 m_events.cancelWait(m_readyEvent, &onReady); 146 onReady(m_readyEvent); 147 m_events.releaseRef(m_readyEvent); 148 m_readyEvent = EventID.invalid; 149 log("finished file events"); 150 } 151 } 152 153 final override FileFD open(string path, FileOpenMode mode) 154 { 155 import std.string : toStringz; 156 157 import std.conv : octal; 158 int flags; 159 int amode; 160 final switch (mode) { 161 case FileOpenMode.read: flags = O_RDONLY|O_BINARY; break; 162 case FileOpenMode.readWrite: flags = O_RDWR|O_BINARY; break; 163 case FileOpenMode.createTrunc: flags = O_RDWR|O_CREAT|O_TRUNC|O_BINARY; amode = octal!644; break; 164 case FileOpenMode.append: flags = O_WRONLY|O_CREAT|O_APPEND|O_BINARY; amode = octal!644; break; 165 } 166 auto fd = () @trusted { return .open(path.toStringz(), flags, amode); } (); 167 if (fd < 0) return FileFD.init; 168 return adopt(fd); 169 } 170 171 final override FileFD adopt(int system_file_handle) 172 { 173 version (Windows) { 174 // TODO: check if FD is a valid file! 175 } else { 176 auto flags = () @trusted { return fcntl(system_file_handle, F_GETFD); } (); 177 if (flags == -1) return FileFD.invalid; 178 } 179 180 if (m_files[system_file_handle].refCount > 0) return FileFD.invalid; 181 auto vc = m_files[system_file_handle].validationCounter; 182 m_files[system_file_handle] = FileInfo.init; 183 m_files[system_file_handle].refCount = 1; 184 m_files[system_file_handle].validationCounter = vc + 1; 185 return FileFD(system_file_handle, vc + 1); 186 } 187 188 void close(FileFD file, FileCloseCallback on_closed) 189 { 190 if (!isValid(file)) { 191 on_closed(file, CloseStatus.invalidHandle); 192 return; 193 } 194 195 // TODO: close may block and should be executed in a worker thread 196 int res = .close(cast(int)file.value); 197 198 m_files[file.value] = FileInfo.init; 199 200 if (on_closed) 201 on_closed(file, res == 0 ? CloseStatus.ok : CloseStatus.ioError); 202 } 203 204 ulong getSize(FileFD file) 205 { 206 if (!isValid(file)) return ulong.max; 207 208 version (linux) { 209 // stat_t seems to be defined wrong on linux/64 210 return .lseek64(cast(int)file, 0, SEEK_END); 211 } else version (Windows) { 212 return _lseeki64(cast(int)file, 0, SEEK_END); 213 } else { 214 stat_t st; 215 () @trusted { fstat(cast(int)file, &st); } (); 216 return st.st_size; 217 } 218 } 219 220 override void truncate(FileFD file, ulong size, FileIOCallback on_finish) 221 { 222 if (!isValid(file)) return; 223 224 version (Posix) { 225 // FIXME: do this in the thread pool 226 227 static if (off_t.max < ulong.max) { 228 if (size > off_t.max) { 229 on_finish(file, IOStatus.error, 0); 230 return; 231 } 232 } 233 234 if (ftruncate(cast(int)file, cast(off_t)size) != 0) { 235 on_finish(file, IOStatus.error, 0); 236 return; 237 } 238 on_finish(file, IOStatus.ok, 0); 239 } else version (Windows) { 240 version (Win64) { 241 import core.sys.windows.windows : FILE_BEGIN, HANDLE, INVALID_HANDLE_VALUE, 242 LARGE_INTEGER, SetFilePointerEx, SetEndOfFile; 243 import core.stdc.stdio : _get_osfhandle; 244 245 auto h = () @trusted { return cast(HANDLE)_get_osfhandle(cast(int)file); } (); 246 if (h == INVALID_HANDLE_VALUE) { 247 on_finish(file, IOStatus.error, 0); 248 return; 249 } 250 LARGE_INTEGER ls = { QuadPart: size }; 251 if (!() @trusted { return SetFilePointerEx(h, ls, null, FILE_BEGIN); } ()) { 252 on_finish(file, IOStatus.error, 0); 253 return; 254 } 255 if (!() @trusted { return SetEndOfFile(h); } ()) { 256 on_finish(file, IOStatus.error, 0); 257 return; 258 } 259 on_finish(file, IOStatus.ok, 0); 260 } else { 261 on_finish(file, IOStatus.error, 0); 262 } 263 } else { 264 on_finish(file, IOStatus.error, 0); 265 } 266 } 267 268 269 final override void write(FileFD file, ulong offset, const(ubyte)[] buffer, IOMode, FileIOCallback on_write_finish) 270 { 271 if (!isValid(file)) { 272 on_write_finish(file, IOStatus.invalidHandle, 0); 273 return; 274 } 275 276 //assert(this.writable); 277 auto f = () @trusted { return &m_files[file]; } (); 278 279 if (!safeCAS(f.write.status, ThreadedFileStatus.idle, ThreadedFileStatus.initiated)) 280 assert(false, "Concurrent file writes are not allowed."); 281 assert(f.write.callback is null, "Concurrent file writes are not allowed."); 282 f.write.callback = on_write_finish; 283 m_activeWrites.insert(file.value); 284 threadSetup(); 285 log("start write task"); 286 try { 287 auto thiss = () @trusted { return cast(shared)this; } (); 288 auto fs = () @trusted { return cast(shared)f; } (); 289 m_fileThreadPool.put(task!(taskFun!("write", const(ubyte)))(thiss, fs, file, offset, buffer)); 290 startWaiting(); 291 } catch (Exception e) { 292 m_activeWrites.remove(file.value); 293 on_write_finish(file, IOStatus.error, 0); 294 return; 295 } 296 } 297 298 final override void cancelWrite(FileFD file) 299 { 300 if (!isValid(file)) return; 301 302 assert(m_activeWrites.contains(file.value), "Cancelling write when no write is in progress."); 303 304 auto f = &m_files[file].write; 305 f.callback = null; 306 m_activeWrites.remove(file.value); 307 m_events.trigger(m_readyEvent, true); // ensure that no stale wait operation is left behind 308 safeCAS(f.status, ThreadedFileStatus.processing, ThreadedFileStatus.cancelling); 309 } 310 311 final override void read(FileFD file, ulong offset, ubyte[] buffer, IOMode, FileIOCallback on_read_finish) 312 { 313 if (!isValid(file)) { 314 on_read_finish(file, IOStatus.invalidHandle, 0); 315 return; 316 } 317 318 auto f = () @trusted { return &m_files[file]; } (); 319 320 if (!safeCAS(f.read.status, ThreadedFileStatus.idle, ThreadedFileStatus.initiated)) 321 assert(false, "Concurrent file reads are not allowed."); 322 assert(f.read.callback is null, "Concurrent file reads are not allowed."); 323 f.read.callback = on_read_finish; 324 m_activeReads.insert(file.value); 325 threadSetup(); 326 log("start read task"); 327 try { 328 auto thiss = () @trusted { return cast(shared)this; } (); 329 auto fs = () @trusted { return cast(shared)f; } (); 330 m_fileThreadPool.put(task!(taskFun!("read", ubyte))(thiss, fs, file, offset, buffer)); 331 startWaiting(); 332 } catch (Exception e) { 333 m_activeReads.remove(file.value); 334 on_read_finish(file, IOStatus.error, 0); 335 return; 336 } 337 } 338 339 final override void cancelRead(FileFD file) 340 { 341 if (!isValid(file)) return; 342 343 assert(m_activeReads.contains(file.value), "Cancelling read when no read is in progress."); 344 345 auto f = &m_files[file].read; 346 f.callback = null; 347 m_activeReads.remove(file.value); 348 m_events.trigger(m_readyEvent, true); // ensure that no stale wait operation is left behind 349 safeCAS(f.status, ThreadedFileStatus.processing, ThreadedFileStatus.cancelling); 350 } 351 352 final override bool isValid(FileFD handle) 353 const { 354 if (handle.value >= m_files.length) return false; 355 return m_files[handle.value].validationCounter == handle.validationCounter; 356 } 357 358 final override void addRef(FileFD descriptor) 359 { 360 if (!isValid(descriptor)) return; 361 362 m_files[descriptor].refCount++; 363 } 364 365 final override bool releaseRef(FileFD descriptor) 366 { 367 if (!isValid(descriptor)) return true; 368 369 auto f = () @trusted { return &m_files[descriptor]; } (); 370 if (!--f.refCount) { 371 close(descriptor, null); 372 assert(!m_activeReads.contains(descriptor.value)); 373 assert(!m_activeWrites.contains(descriptor.value)); 374 return false; 375 } 376 return true; 377 } 378 379 protected final override void* rawUserData(FileFD descriptor, size_t size, DataInitializer initialize, DataInitializer destroy) 380 @system { 381 if (!isValid(descriptor)) return null; 382 383 FileInfo* fds = &m_files[descriptor]; 384 assert(fds.userDataDestructor is null || fds.userDataDestructor is destroy, 385 "Requesting user data with differing type (destructor)."); 386 assert(size <= FileInfo.userData.length, "Requested user data is too large."); 387 if (size > FileInfo.userData.length) assert(false); 388 if (!fds.userDataDestructor) { 389 initialize(fds.userData.ptr); 390 fds.userDataDestructor = destroy; 391 } 392 return fds.userData.ptr; 393 } 394 395 /// private 396 static void taskFun(string op, UB)(shared(ThreadedFileEventDriver) files, shared(FileInfo)* fi, FileFD file, ulong offset, UB[] buffer) 397 { 398 log("task fun"); 399 shared(IOInfo)* f = mixin("&fi."~op); 400 log("start processing"); 401 402 if (!safeCAS(f.status, ThreadedFileStatus.initiated, ThreadedFileStatus.processing)) 403 assert(false, "File slot not in initiated state when processor task is started."); 404 405 auto bytes = buffer; 406 version (Windows) { 407 ._lseeki64(cast(int)file, offset, SEEK_SET); 408 } else .lseek64(cast(int)file, offset, SEEK_SET); 409 410 scope (exit) { 411 log("trigger event"); 412 files.m_events.trigger(files.m_readyEvent, true); 413 } 414 415 if (bytes.length == 0) safeAtomicStore(f.ioStatus, IOStatus.ok); 416 417 while (bytes.length > 0) { 418 auto sz = min(bytes.length, 512*1024); 419 auto ret = () @trusted { return mixin("."~op)(cast(int)file, bytes.ptr, cast(uint)sz); } (); 420 if (ret != sz) { 421 safeAtomicStore(f.ioStatus, IOStatus.error); 422 log("error"); 423 break; 424 } 425 bytes = bytes[sz .. $]; 426 log("check for cancel"); 427 if (safeCAS(f.status, ThreadedFileStatus.cancelling, ThreadedFileStatus.cancelled)) return; 428 } 429 430 safeAtomicStore(f.bytesWritten, buffer.length - bytes.length); 431 log("wait for status set"); 432 while (true) { 433 if (safeCAS(f.status, ThreadedFileStatus.processing, ThreadedFileStatus.finished)) break; 434 if (safeCAS(f.status, ThreadedFileStatus.cancelling, ThreadedFileStatus.cancelled)) break; 435 } 436 } 437 438 private void onReady(EventID) 439 { 440 log("ready event"); 441 foreach (f; m_activeReads) { 442 auto fd = FileFD(f, m_files[f].validationCounter); 443 m_files[f].read.finalize(fd, { m_activeReads.remove(f); }); 444 } 445 446 foreach (f; m_activeWrites) { 447 auto fd = FileFD(f, m_files[f].validationCounter); 448 m_files[f].write.finalize(fd, { m_activeWrites.remove(f); }); 449 } 450 451 m_waiting = false; 452 startWaiting(); 453 } 454 455 private void startWaiting() 456 { 457 if (!m_waiting && (!m_activeWrites.empty || !m_activeReads.empty)) { 458 log("wait for ready"); 459 m_events.wait(m_readyEvent, &onReady); 460 m_waiting = true; 461 } 462 } 463 464 private void threadSetup() 465 { 466 if (m_readyEvent == EventID.invalid) { 467 log("create file event"); 468 m_readyEvent = m_events.create(); 469 } 470 if (!m_fileThreadPool) { 471 log("aquire thread pool"); 472 m_fileThreadPool = acquireIOWorkerPool(); 473 } 474 } 475 } 476 477 private auto safeAtomicLoad(T)(ref shared(T) v) @trusted { return atomicLoad(v); } 478 private auto safeAtomicStore(T)(ref shared(T) v, T a) @trusted { return atomicStore(v, a); } 479 private auto safeCAS(T, U, V)(ref shared(T) v, U a, V b) @trusted { return cas(&v, a, b); } 480 481 private void log(ARGS...)(string fmt, ARGS args) 482 @trusted nothrow { 483 debug (EventCoreLogFiles) { 484 scope (failure) assert(false); 485 import core.thread : Thread; 486 import std.stdio : writef, writefln; 487 writef("[%s] ", Thread.getThis().name); 488 writefln(fmt, args); 489 } 490 }