1 module eventcore.drivers.threadedfile;
2 
3 import eventcore.driver;
4 import eventcore.internal.ioworker;
5 import eventcore.internal.utils;
6 import core.atomic;
7 import core.stdc.errno;
8 import std.algorithm.comparison : among, min;
9 
10 version (Posix) {
11 	import core.sys.posix.fcntl;
12 	import core.sys.posix.sys.stat;
13 	import core.sys.posix.unistd;
14 }
15 version (Windows) {
16     import core.sys.windows.stat;
17 
18 	private {
19 		// TODO: use CreateFile/HANDLE instead of the Posix API on Windows
20 
21 		extern(C) nothrow {
22 			alias off_t = sizediff_t;
23 			int open(in char* name, int mode, ...);
24 			int chmod(in char* name, int mode);
25 			int close(int fd) @safe;
26 			int read(int fd, void *buffer, uint count);
27 			int write(int fd, in void *buffer, uint count);
28 			long _lseeki64(int fd, long offset, int origin) @safe;
29 		}
30 
31 		enum O_RDONLY = 0;
32 		enum O_WRONLY = 1;
33 		enum O_RDWR = 2;
34 		enum O_APPEND = 8;
35 		enum O_CREAT = 0x0100;
36 		enum O_TRUNC = 0x0200;
37 		enum O_BINARY = 0x8000;
38 
39 		enum _S_IREAD = 0x0100;          /* read permission, owner */
40 		enum _S_IWRITE = 0x0080;          /* write permission, owner */
41 		alias stat_t = struct_stat;
42 	}
43 }
44 else
45 {
46 	enum O_BINARY = 0;
47 }
48 
49 version (darwin) {
50 	// NOTE: Always building for 64-bit, so these are identical
51 	alias lseek64 = lseek;
52 }
53 
54 version (Android) {
55 	static if (!is(off64_t)) {
56 		alias off64_t = long;
57 		extern(C) off64_t lseek64(int, off64_t, int) @safe nothrow;
58 	}
59 }
60 
61 private {
62 	enum SEEK_SET = 0;
63 	enum SEEK_CUR = 1;
64 	enum SEEK_END = 2;
65 }
66 
67 
68 final class ThreadedFileEventDriver(Events : EventDriverEvents) : EventDriverFiles
69 {
70 	import std.parallelism;
71 
72 	private {
73 		enum ThreadedFileStatus {
74 			idle,         // -> initiated                 (by caller)
75 			initiated,    // -> processing                (by worker)
76 			processing,   // -> cancelling, finished      (by caller, worker)
77 			cancelling,   // -> cancelled                 (by worker)
78 			cancelled,    // -> idle                      (by event receiver)
79 			finished      // -> idle                      (by event receiver)
80 		}
81 
82 		static struct IOInfo {
83 			FileIOCallback callback;
84 			shared ThreadedFileStatus status;
85 			shared size_t bytesWritten;
86 			shared IOStatus ioStatus;
87 
88 			void finalize(FileFD fd, scope void delegate() @safe nothrow pre_cb)
89 			@safe nothrow {
90 				auto st = safeAtomicLoad(this.status);
91 				if (st == ThreadedFileStatus.finished) {
92 					auto ios = safeAtomicLoad(this.ioStatus);
93 					auto btw = safeAtomicLoad(this.bytesWritten);
94 					auto cb = this.callback;
95 					this.callback = null;
96 					safeAtomicStore(this.status, ThreadedFileStatus.idle);
97 					pre_cb();
98 					if (cb) {
99 						log("fire callback");
100 						cb(fd, ios, btw);
101 					}
102 				} else if (st == ThreadedFileStatus.cancelled) {
103 					this.callback = null;
104 					safeAtomicStore(this.status, ThreadedFileStatus.idle);
105 					pre_cb();
106 					log("ignore callback due to cancellation");
107 				}
108 			}
109 		}
110 
111 		static struct FileInfo {
112 			IOInfo read;
113 			IOInfo write;
114 
115 			uint validationCounter;
116 
117 			int refCount;
118 			DataInitializer userDataDestructor;
119 			ubyte[16*size_t.sizeof] userData;
120 		}
121 
122 		IOWorkerPool m_fileThreadPool;
123 		ChoppedVector!FileInfo m_files; // TODO: use the one from the posix loop
124 		SmallIntegerSet!size_t m_activeReads;
125 		SmallIntegerSet!size_t m_activeWrites;
126 		EventID m_readyEvent = EventID.invalid;
127 		bool m_waiting;
128 		Events m_events;
129 	}
130 
131 	@safe: nothrow:
132 
133 	this(Events events)
134 	{
135 		m_events = events;
136 	}
137 
138 	void dispose()
139 	{
140 		m_fileThreadPool = IOWorkerPool.init;
141 
142 		if (m_readyEvent != EventID.invalid) {
143 			log("finishing file events");
144 			if (m_waiting)
145 				m_events.cancelWait(m_readyEvent, &onReady);
146 			onReady(m_readyEvent);
147 			m_events.releaseRef(m_readyEvent);
148 			m_readyEvent = EventID.invalid;
149 			log("finished file events");
150 		}
151 	}
152 
153 	final override FileFD open(string path, FileOpenMode mode)
154 	{
155 		import std.string : toStringz;
156 
157 		import std.conv : octal;
158 		int flags;
159 		int amode;
160 		final switch (mode) {
161 			case FileOpenMode.read: flags = O_RDONLY|O_BINARY; break;
162 			case FileOpenMode.readWrite: flags = O_RDWR|O_BINARY; break;
163 			case FileOpenMode.createTrunc: flags = O_RDWR|O_CREAT|O_TRUNC|O_BINARY; amode = octal!644; break;
164 			case FileOpenMode.append: flags = O_WRONLY|O_CREAT|O_APPEND|O_BINARY; amode = octal!644; break;
165 		}
166 		auto fd = () @trusted { return .open(path.toStringz(), flags, amode); } ();
167 		if (fd < 0) return FileFD.init;
168 		return adopt(fd);
169 	}
170 
171 	final override FileFD adopt(int system_file_handle)
172 	{
173 		version (Windows) {
174 			// TODO: check if FD is a valid file!
175 		} else {
176 			auto flags = () @trusted { return fcntl(system_file_handle, F_GETFD); } ();
177 			if (flags == -1) return FileFD.invalid;
178 		}
179 
180 		if (m_files[system_file_handle].refCount > 0) return FileFD.invalid;
181 		auto vc = m_files[system_file_handle].validationCounter;
182 		m_files[system_file_handle] = FileInfo.init;
183 		m_files[system_file_handle].refCount = 1;
184 		m_files[system_file_handle].validationCounter = vc + 1;
185 		return FileFD(system_file_handle, vc + 1);
186 	}
187 
188 	void close(FileFD file, FileCloseCallback on_closed)
189 	{
190 		if (!isValid(file)) {
191 			on_closed(file, CloseStatus.invalidHandle);
192 			return;
193 		}
194 
195 		// TODO: close may block and should be executed in a worker thread
196 		int res = .close(cast(int)file.value);
197 
198 		m_files[file.value] = FileInfo.init;
199 
200 		if (on_closed)
201 			on_closed(file, res == 0 ? CloseStatus.ok : CloseStatus.ioError);
202 	}
203 
204 	ulong getSize(FileFD file)
205 	{
206 		if (!isValid(file)) return ulong.max;
207 
208 		version (linux) {
209 			// stat_t seems to be defined wrong on linux/64
210 			return .lseek64(cast(int)file, 0, SEEK_END);
211 		} else version (Windows) {
212 			return _lseeki64(cast(int)file, 0, SEEK_END);
213 		} else {
214 			stat_t st;
215 			() @trusted { fstat(cast(int)file, &st); } ();
216 			return st.st_size;
217 		}
218 	}
219 
220 	override void truncate(FileFD file, ulong size, FileIOCallback on_finish)
221 	{
222 		if (!isValid(file)) return;
223 
224 		version (Posix) {
225 			// FIXME: do this in the thread pool
226 
227 			static if (off_t.max < ulong.max) {
228 				if (size > off_t.max) {
229 					on_finish(file, IOStatus.error, 0);
230 					return;
231 				}
232 			}
233 
234 			if (ftruncate(cast(int)file, cast(off_t)size) != 0) {
235 				on_finish(file, IOStatus.error, 0);
236 				return;
237 			}
238 			on_finish(file, IOStatus.ok, 0);
239 		} else version (Windows) {
240 			version (Win64) {
241 				import core.sys.windows.windows : FILE_BEGIN, HANDLE, INVALID_HANDLE_VALUE,
242 					LARGE_INTEGER, SetFilePointerEx, SetEndOfFile;
243 				import core.stdc.stdio : _get_osfhandle;
244 
245 				auto h = () @trusted { return cast(HANDLE)_get_osfhandle(cast(int)file); } ();
246 				if (h == INVALID_HANDLE_VALUE) {
247 					on_finish(file, IOStatus.error, 0);
248 					return;
249 				}
250 				LARGE_INTEGER ls = { QuadPart: size };
251 				if (!() @trusted { return SetFilePointerEx(h, ls, null, FILE_BEGIN); } ()) {
252 					on_finish(file, IOStatus.error, 0);
253 					return;
254 				}
255 				if (!() @trusted { return SetEndOfFile(h); } ()) {
256 					on_finish(file, IOStatus.error, 0);
257 					return;
258 				}
259 				on_finish(file, IOStatus.ok, 0);
260 			} else {
261 				on_finish(file, IOStatus.error, 0);
262 			}
263 		} else {
264 			on_finish(file, IOStatus.error, 0);
265 		}
266 	}
267 
268 
269 	final override void write(FileFD file, ulong offset, const(ubyte)[] buffer, IOMode, FileIOCallback on_write_finish)
270 	{
271 		if (!isValid(file)) {
272 			on_write_finish(file, IOStatus.invalidHandle, 0);
273 			return;
274 		}
275 
276 		//assert(this.writable);
277 		auto f = () @trusted { return &m_files[file]; } ();
278 
279 		if (!safeCAS(f.write.status, ThreadedFileStatus.idle, ThreadedFileStatus.initiated))
280 			assert(false, "Concurrent file writes are not allowed.");
281 		assert(f.write.callback is null, "Concurrent file writes are not allowed.");
282 		f.write.callback = on_write_finish;
283 		m_activeWrites.insert(file.value);
284 		threadSetup();
285 		log("start write task");
286 		try {
287 			auto thiss = () @trusted { return cast(shared)this; } ();
288 			auto fs = () @trusted { return cast(shared)f; } ();
289 			m_fileThreadPool.put(task!(taskFun!("write", const(ubyte)))(thiss, fs, file, offset, buffer));
290 			startWaiting();
291 		} catch (Exception e) {
292 			m_activeWrites.remove(file.value);
293 			on_write_finish(file, IOStatus.error, 0);
294 			return;
295 		}
296 	}
297 
298 	final override void cancelWrite(FileFD file)
299 	{
300 		if (!isValid(file)) return;
301 
302 		assert(m_activeWrites.contains(file.value), "Cancelling write when no write is in progress.");
303 
304 		auto f = &m_files[file].write;
305 		f.callback = null;
306 		m_activeWrites.remove(file.value);
307 		m_events.trigger(m_readyEvent, true); // ensure that no stale wait operation is left behind
308 		safeCAS(f.status, ThreadedFileStatus.processing, ThreadedFileStatus.cancelling);
309 	}
310 
311 	final override void read(FileFD file, ulong offset, ubyte[] buffer, IOMode, FileIOCallback on_read_finish)
312 	{
313 		if (!isValid(file)) {
314 			on_read_finish(file, IOStatus.invalidHandle, 0);
315 			return;
316 		}
317 
318 		auto f = () @trusted { return &m_files[file]; } ();
319 
320 		if (!safeCAS(f.read.status, ThreadedFileStatus.idle, ThreadedFileStatus.initiated))
321 			assert(false, "Concurrent file reads are not allowed.");
322 		assert(f.read.callback is null, "Concurrent file reads are not allowed.");
323 		f.read.callback = on_read_finish;
324 		m_activeReads.insert(file.value);
325 		threadSetup();
326 		log("start read task");
327 		try {
328 			auto thiss = () @trusted { return cast(shared)this; } ();
329 			auto fs = () @trusted { return cast(shared)f; } ();
330 			m_fileThreadPool.put(task!(taskFun!("read", ubyte))(thiss, fs, file, offset, buffer));
331 			startWaiting();
332 		} catch (Exception e) {
333 			m_activeReads.remove(file.value);
334 			on_read_finish(file, IOStatus.error, 0);
335 			return;
336 		}
337 	}
338 
339 	final override void cancelRead(FileFD file)
340 	{
341 		if (!isValid(file)) return;
342 
343 		assert(m_activeReads.contains(file.value), "Cancelling read when no read is in progress.");
344 
345 		auto f = &m_files[file].read;
346 		f.callback = null;
347 		m_activeReads.remove(file.value);
348 		m_events.trigger(m_readyEvent, true); // ensure that no stale wait operation is left behind
349 		safeCAS(f.status, ThreadedFileStatus.processing, ThreadedFileStatus.cancelling);
350 	}
351 
352 	final override bool isValid(FileFD handle)
353 	const {
354 		if (handle.value >= m_files.length) return false;
355 		return m_files[handle.value].validationCounter == handle.validationCounter;
356 	}
357 
358 	final override void addRef(FileFD descriptor)
359 	{
360 		if (!isValid(descriptor)) return;
361 
362 		m_files[descriptor].refCount++;
363 	}
364 
365 	final override bool releaseRef(FileFD descriptor)
366 	{
367 		if (!isValid(descriptor)) return true;
368 
369 		auto f = () @trusted { return &m_files[descriptor]; } ();
370 		if (!--f.refCount) {
371 			close(descriptor, null);
372 			assert(!m_activeReads.contains(descriptor.value));
373 			assert(!m_activeWrites.contains(descriptor.value));
374 			return false;
375 		}
376 		return true;
377 	}
378 
379 	protected final override void* rawUserData(FileFD descriptor, size_t size, DataInitializer initialize, DataInitializer destroy)
380 	@system {
381 		if (!isValid(descriptor)) return null;
382 
383 		FileInfo* fds = &m_files[descriptor];
384 		assert(fds.userDataDestructor is null || fds.userDataDestructor is destroy,
385 			"Requesting user data with differing type (destructor).");
386 		assert(size <= FileInfo.userData.length, "Requested user data is too large.");
387 		if (size > FileInfo.userData.length) assert(false);
388 		if (!fds.userDataDestructor) {
389 			initialize(fds.userData.ptr);
390 			fds.userDataDestructor = destroy;
391 		}
392 		return fds.userData.ptr;
393 	}
394 
395 	/// private
396 	static void taskFun(string op, UB)(shared(ThreadedFileEventDriver) files, shared(FileInfo)* fi, FileFD file, ulong offset, UB[] buffer)
397 	{
398 log("task fun");
399 		shared(IOInfo)* f = mixin("&fi."~op);
400 log("start processing");
401 
402 		if (!safeCAS(f.status, ThreadedFileStatus.initiated, ThreadedFileStatus.processing))
403 			assert(false, "File slot not in initiated state when processor task is started.");
404 
405 		auto bytes = buffer;
406 		version (Windows) {
407 			._lseeki64(cast(int)file, offset, SEEK_SET);
408 		} else .lseek64(cast(int)file, offset, SEEK_SET);
409 
410 		scope (exit) {
411 log("trigger event");
412 			files.m_events.trigger(files.m_readyEvent, true);
413 		}
414 
415 		if (bytes.length == 0) safeAtomicStore(f.ioStatus, IOStatus.ok);
416 
417 		while (bytes.length > 0) {
418 			auto sz = min(bytes.length, 512*1024);
419 			auto ret = () @trusted { return mixin("."~op)(cast(int)file, bytes.ptr, cast(uint)sz); } ();
420 			if (ret != sz) {
421 				safeAtomicStore(f.ioStatus, IOStatus.error);
422 log("error");
423 				break;
424 			}
425 			bytes = bytes[sz .. $];
426 log("check for cancel");
427 			if (safeCAS(f.status, ThreadedFileStatus.cancelling, ThreadedFileStatus.cancelled)) return;
428 		}
429 
430 		safeAtomicStore(f.bytesWritten, buffer.length - bytes.length);
431 log("wait for status set");
432 		while (true) {
433 			if (safeCAS(f.status, ThreadedFileStatus.processing, ThreadedFileStatus.finished)) break;
434 			if (safeCAS(f.status, ThreadedFileStatus.cancelling, ThreadedFileStatus.cancelled)) break;
435 		}
436 	}
437 
438 	private void onReady(EventID)
439 	{
440 log("ready event");
441 		foreach (f; m_activeReads) {
442 			auto fd = FileFD(f, m_files[f].validationCounter);
443 			m_files[f].read.finalize(fd, { m_activeReads.remove(f); });
444 		}
445 
446 		foreach (f; m_activeWrites) {
447 			auto fd = FileFD(f, m_files[f].validationCounter);
448 			m_files[f].write.finalize(fd, { m_activeWrites.remove(f); });
449 		}
450 
451 		m_waiting = false;
452 		startWaiting();
453 	}
454 
455 	private void startWaiting()
456 	{
457 		if (!m_waiting && (!m_activeWrites.empty || !m_activeReads.empty)) {
458 			log("wait for ready");
459 			m_events.wait(m_readyEvent, &onReady);
460 			m_waiting = true;
461 		}
462 	}
463 
464 	private void threadSetup()
465 	{
466 		if (m_readyEvent == EventID.invalid) {
467 			log("create file event");
468 			m_readyEvent = m_events.create();
469 		}
470 		if (!m_fileThreadPool) {
471 			log("aquire thread pool");
472 			m_fileThreadPool = acquireIOWorkerPool();
473 		}
474 	}
475 }
476 
477 private auto safeAtomicLoad(T)(ref shared(T) v) @trusted { return atomicLoad(v); }
478 private auto safeAtomicStore(T)(ref shared(T) v, T a) @trusted { return atomicStore(v, a); }
479 private auto safeCAS(T, U, V)(ref shared(T) v, U a, V b) @trusted { return cas(&v, a, b); }
480 
481 private void log(ARGS...)(string fmt, ARGS args)
482 @trusted nothrow {
483 	debug (EventCoreLogFiles) {
484 		scope (failure) assert(false);
485 		import core.thread : Thread;
486 		import std.stdio : writef, writefln;
487 		writef("[%s] ", Thread.getThis().name);
488 		writefln(fmt, args);
489 	}
490 }