1 /** Provides a shared task pool for distributing tasks to worker threads. 2 */ 3 module eventcore.internal.ioworker; 4 5 import eventcore.internal.utils; 6 7 import std.parallelism : TaskPool, Task, task; 8 9 10 IOWorkerPool acquireIOWorkerPool() 11 @safe nothrow { 12 return IOWorkerPool(true); 13 } 14 15 struct IOWorkerPool { 16 private { 17 TaskPool m_pool; 18 } 19 20 @safe nothrow: 21 22 private this(bool) { m_pool = StaticTaskPool.addRef(); } 23 ~this() { if (m_pool) StaticTaskPool.releaseRef(); } 24 this(this) { if (m_pool) StaticTaskPool.addRef(); } 25 26 bool opCast(T)() const if (is(T == bool)) { return !!m_pool; } 27 28 @property TaskPool pool() { return m_pool; } 29 30 alias pool this; 31 32 auto run(alias fun, ARGS...)(ARGS args) 33 { 34 auto t = task!(fun, ARGS)(args); 35 try m_pool.put(t); 36 catch (Exception e) assert(false, e.msg); 37 return t; 38 } 39 } 40 41 // Maintains a single thread pool shared by all driver instances (threads) 42 private struct StaticTaskPool { 43 import core.sync.mutex : Mutex; 44 45 private { 46 static shared Mutex m_mutex; 47 static __gshared TaskPool m_pool; 48 static __gshared int m_refCount = 0; 49 } 50 51 shared static this() 52 { 53 m_mutex = new shared Mutex; 54 } 55 56 static TaskPool addRef() 57 @trusted nothrow { 58 m_mutex.lock_nothrow(); 59 scope (exit) m_mutex.unlock_nothrow(); 60 61 if (!m_refCount++) { 62 try { 63 m_pool = mallocT!TaskPool(4); 64 m_pool.isDaemon = true; 65 } catch (Exception e) { 66 assert(false, e.msg); 67 } 68 } 69 70 return m_pool; 71 } 72 73 static void releaseRef() 74 @trusted nothrow { 75 TaskPool fin_pool; 76 77 { 78 m_mutex.lock_nothrow(); 79 scope (exit) m_mutex.unlock_nothrow(); 80 81 if (!--m_refCount) { 82 fin_pool = m_pool; 83 m_pool = null; 84 } 85 } 86 87 if (fin_pool) { 88 //log("finishing thread pool"); 89 try { 90 fin_pool.finish(true); 91 freeT(fin_pool); 92 } catch (Exception e) { 93 //log("Failed to shut down file I/O thread pool."); 94 } 95 } 96 } 97 }