1 /** 2 Efficient generic management of large numbers of timers. 3 */ 4 module eventcore.drivers.timer; 5 6 import eventcore.driver; 7 import eventcore.internal.consumablequeue; 8 import eventcore.internal.dlist; 9 import eventcore.internal.utils : mallocT, freeT, nogc_assert; 10 import core.time : Duration, MonoTime, hnsecs; 11 12 final class LoopTimeoutTimerDriver : EventDriverTimers { 13 import std.experimental.allocator.building_blocks.free_list; 14 import std.experimental.allocator.building_blocks.region; 15 import std.experimental.allocator.mallocator; 16 import std.experimental.allocator : dispose, make; 17 import std.container.array; 18 import std.datetime : Clock; 19 import std.range : SortedRange, assumeSorted, take; 20 import core.memory : GC; 21 22 private { 23 static FreeList!(Mallocator, TimerSlot.sizeof) ms_allocator; 24 TimerSlot*[TimerID] m_timers; 25 StackDList!TimerSlot m_timerQueue; 26 TimerID m_lastTimerID; 27 ConsumableQueue!(TimerSlot*) m_firedTimers; 28 } 29 30 static this() 31 { 32 ms_allocator.parent = Mallocator.instance; 33 } 34 35 this() 36 @nogc @safe nothrow { 37 m_firedTimers = mallocT!(ConsumableQueue!(TimerSlot*)); 38 } 39 40 ~this() 41 @nogc @trusted nothrow { 42 try freeT(m_firedTimers); 43 catch (Exception e) assert(false, e.msg); 44 } 45 46 package @property size_t pendingCount() const @safe nothrow { return m_timerQueue.length; } 47 48 final package Duration getNextTimeout(MonoTime time) 49 @safe nothrow { 50 if (m_timerQueue.empty) return Duration.max; 51 return m_timerQueue.front.timeout - time; 52 } 53 54 final package bool process(MonoTime time) 55 @trusted nothrow { 56 assert(m_firedTimers.length == 0); 57 if (m_timerQueue.empty) return false; 58 59 foreach (tm; m_timerQueue[]) { 60 if (tm.timeout > time) break; 61 if (tm.repeatDuration > Duration.zero) { 62 do tm.timeout += tm.repeatDuration; 63 while (tm.timeout <= time); 64 } else tm.pending = false; 65 m_firedTimers.put(tm); 66 } 67 68 auto processed_timers = m_firedTimers.consume(); 69 70 foreach (tm; processed_timers) { 71 m_timerQueue.remove(tm); 72 if (tm.repeatDuration > Duration.zero) 73 enqueueTimer(tm); 74 } 75 76 foreach (tm; processed_timers) { 77 auto cb = tm.callback; 78 tm.callback = null; 79 if (cb) { 80 cb(tm.id, true); 81 releaseRef(tm.id); 82 } 83 } 84 85 return processed_timers.length > 0; 86 } 87 88 final override TimerID create() 89 @trusted { 90 auto id = TimerID(++m_lastTimerID, 0); 91 TimerSlot* tm; 92 try tm = ms_allocator.make!TimerSlot; 93 catch (Exception e) return TimerID.invalid; 94 assert(tm !is null); 95 GC.addRange(tm, TimerSlot.sizeof, typeid(TimerSlot)); 96 tm.id = id; 97 tm.refCount = 1; 98 tm.timeout = MonoTime.max; 99 m_timers[id] = tm; 100 return id; 101 } 102 103 final override void set(TimerID timer, Duration timeout, Duration repeat) 104 @trusted { 105 if (!isValid(timer)) return; 106 107 scope (failure) assert(false); 108 auto tm = m_timers[timer]; 109 if (tm.pending) stop(timer); 110 tm.timeout = MonoTime.currTime + timeout; 111 tm.repeatDuration = repeat; 112 tm.pending = true; 113 enqueueTimer(tm); 114 } 115 116 final override void stop(TimerID timer) 117 @trusted { 118 import std.algorithm.mutation : swap; 119 120 if (!isValid(timer)) return; 121 122 auto tm = m_timers[timer]; 123 if (!tm.pending) return; 124 TimerCallback2 cb; 125 swap(cb, tm.callback); 126 if (cb) { 127 cb(timer, false); 128 releaseRef(timer); 129 } 130 tm.pending = false; 131 m_timerQueue.remove(tm); 132 } 133 134 final override bool isPending(TimerID descriptor) 135 { 136 if (!isValid(descriptor)) return false; 137 138 return m_timers[descriptor].pending; 139 } 140 141 final override bool isPeriodic(TimerID descriptor) 142 { 143 if (!isValid(descriptor)) return false; 144 145 return m_timers[descriptor].repeatDuration > Duration.zero; 146 } 147 148 final override void wait(TimerID timer, TimerCallback2 callback) 149 { 150 if (!isValid(timer)) return; 151 152 assert(!m_timers[timer].callback, "Calling wait() on a timer that is already waiting."); 153 m_timers[timer].callback = callback; 154 addRef(timer); 155 } 156 alias wait = EventDriverTimers.wait; 157 158 final override void cancelWait(TimerID timer) 159 { 160 if (!isValid(timer)) return; 161 162 auto pt = m_timers[timer]; 163 assert(pt.callback); 164 pt.callback = null; 165 releaseRef(timer); 166 } 167 168 override bool isValid(TimerID handle) 169 const { 170 return (handle in m_timers) !is null; 171 } 172 173 final override void addRef(TimerID descriptor) 174 { 175 if (!isValid(descriptor)) return; 176 177 m_timers[descriptor].refCount++; 178 } 179 180 final override bool releaseRef(TimerID descriptor) 181 { 182 if (!isValid(descriptor)) return true; 183 184 auto tm = m_timers[descriptor]; 185 tm.refCount--; 186 187 // cancel starved timer waits 188 if (tm.callback && tm.refCount == 1 && !tm.pending) { 189 debug addRef(descriptor); 190 cancelWait(tm.id); 191 debug { 192 assert(tm.refCount == 1); 193 releaseRef(descriptor); 194 } 195 return false; 196 } 197 198 if (!tm.refCount) { 199 if (tm.pending) stop(tm.id); 200 m_timers.remove(descriptor); 201 () @trusted { 202 scope (failure) assert(false); 203 ms_allocator.dispose(tm); 204 GC.removeRange(tm); 205 } (); 206 207 return false; 208 } 209 210 return true; 211 } 212 213 final bool isUnique(TimerID descriptor) 214 const { 215 if (!isValid(descriptor)) return false; 216 217 return m_timers[descriptor].refCount == 1; 218 } 219 220 protected final override void* rawUserData(TimerID descriptor, size_t size, DataInitializer initialize, DataInitializer destroy) 221 @system { 222 if (!isValid(descriptor)) return null; 223 224 TimerSlot* fds = m_timers[descriptor]; 225 assert(fds.userDataDestructor is null || fds.userDataDestructor is destroy, 226 "Requesting user data with differing type (destructor)."); 227 assert(size <= TimerSlot.userData.length, "Requested user data is too large."); 228 if (size > TimerSlot.userData.length) assert(false); 229 if (!fds.userDataDestructor) { 230 initialize(fds.userData.ptr); 231 fds.userDataDestructor = destroy; 232 } 233 return fds.userData.ptr; 234 } 235 236 private void enqueueTimer(TimerSlot* tm) 237 nothrow { 238 TimerSlot* ns; 239 foreach_reverse (t; m_timerQueue[]) 240 if (t.timeout <= tm.timeout) { 241 ns = t; 242 break; 243 } 244 245 if (ns) m_timerQueue.insertAfter(tm, ns); 246 else m_timerQueue.insertFront(tm); 247 } 248 } 249 250 struct TimerSlot { 251 TimerSlot* prev, next; 252 TimerID id; 253 uint refCount; 254 bool pending; 255 MonoTime timeout; 256 Duration repeatDuration; 257 TimerCallback2 callback; // TODO: use a list with small-value optimization 258 259 DataInitializer userDataDestructor; 260 ubyte[16*size_t.sizeof] userData; 261 }