1 /** Provides a shared task pool for distributing tasks to worker threads.
2 */
3 module eventcore.internal.ioworker;
4 
5 import eventcore.internal.utils;
6 
7 import std.parallelism : TaskPool, Task, task;
8 
9 
10 IOWorkerPool acquireIOWorkerPool()
11 @safe nothrow {
12 	return IOWorkerPool(true);
13 }
14 
15 struct IOWorkerPool {
16 	private {
17 		TaskPool m_pool;
18 	}
19 
20 	@safe nothrow:
21 
22 	private this(bool) { m_pool = StaticTaskPool.addRef(); }
23 	~this() { if (m_pool) StaticTaskPool.releaseRef(); }
24 	this(this) { if (m_pool) StaticTaskPool.addRef(); }
25 
26 	bool opCast(T)() const if (is(T == bool)) { return !!m_pool; }
27 
28 	@property TaskPool pool() { return m_pool; }
29 
30 	alias pool this;
31 
32 	auto run(alias fun, ARGS...)(ARGS args)
33 	{
34 		auto t = task!(fun, ARGS)(args);
35 		try m_pool.put(t);
36 		catch (Exception e) assert(false, e.msg);
37 		return t;
38 	}
39 }
40 
41 // Maintains a single thread pool shared by all driver instances (threads)
42 private struct StaticTaskPool {
43 	import core.sync.mutex : Mutex;
44 
45 	private {
46 		static shared Mutex m_mutex;
47 		static __gshared TaskPool m_pool;
48 		static __gshared int m_refCount = 0;
49 	}
50 
51 	shared static this()
52 	{
53 		m_mutex = new shared Mutex;
54 	}
55 
56 	static TaskPool addRef()
57 	@trusted nothrow {
58 		m_mutex.lock_nothrow();
59 		scope (exit) m_mutex.unlock_nothrow();
60 
61 		if (!m_refCount++) {
62 			try {
63 				m_pool = mallocT!TaskPool(4);
64 				m_pool.isDaemon = true;
65 			} catch (Exception e) {
66 				assert(false, e.msg);
67 			}
68 		}
69 
70 		return m_pool;
71 	}
72 
73 	static void releaseRef()
74 	@trusted nothrow {
75 		TaskPool fin_pool;
76 
77 		{
78 			m_mutex.lock_nothrow();
79 			scope (exit) m_mutex.unlock_nothrow();
80 
81 			if (!--m_refCount) {
82 				fin_pool = m_pool;
83 				m_pool = null;
84 			}
85 		}
86 
87 		if (fin_pool) {
88 			//log("finishing thread pool");
89 			try {
90 				fin_pool.finish(true);
91 				freeT(fin_pool);
92 			} catch (Exception e) {
93 				//log("Failed to shut down file I/O thread pool.");
94 			}
95 		}
96 	}
97 }