1 /**
2 	Efficient generic management of large numbers of timers.
3 */
4 module eventcore.drivers.timer;
5 
6 import eventcore.driver;
7 import eventcore.internal.consumablequeue;
8 import eventcore.internal.dlist;
9 import eventcore.internal.utils : mallocT, freeT, nogc_assert;
10 import core.time : Duration, MonoTime, hnsecs;
11 
12 final class LoopTimeoutTimerDriver : EventDriverTimers {
13 	import std.experimental.allocator.building_blocks.free_list;
14 	import std.experimental.allocator.building_blocks.region;
15 	import std.experimental.allocator.mallocator;
16 	import std.experimental.allocator : dispose, make;
17 	import std.container.array;
18 	import std.datetime : Clock;
19 	import std.range : SortedRange, assumeSorted, take;
20 	import core.memory : GC;
21 
22 	private {
23 		static FreeList!(Mallocator, TimerSlot.sizeof) ms_allocator;
24 		TimerSlot*[TimerID] m_timers;
25 		StackDList!TimerSlot m_timerQueue;
26 		TimerID m_lastTimerID;
27 		ConsumableQueue!(TimerSlot*) m_firedTimers;
28 	}
29 
30 	static this()
31 	{
32 		ms_allocator.parent = Mallocator.instance;
33 	}
34 
35 	this()
36 	@nogc @safe nothrow {
37 		m_firedTimers = mallocT!(ConsumableQueue!(TimerSlot*));
38 	}
39 
40 	~this()
41 	@nogc @trusted nothrow {
42 		try freeT(m_firedTimers);
43 		catch (Exception e) assert(false, e.msg);
44 	}
45 
46 	package @property size_t pendingCount() const @safe nothrow { return m_timerQueue.length; }
47 
48 	final package Duration getNextTimeout(MonoTime time)
49 	@safe nothrow {
50 		if (m_timerQueue.empty) return Duration.max;
51 		return m_timerQueue.front.timeout - time;
52 	}
53 
54 	final package bool process(MonoTime time)
55 	@trusted nothrow {
56 		assert(m_firedTimers.length == 0);
57 		if (m_timerQueue.empty) return false;
58 
59 		foreach (tm; m_timerQueue[]) {
60 			if (tm.timeout > time) break;
61 			if (tm.repeatDuration > Duration.zero) {
62 				do tm.timeout += tm.repeatDuration;
63 				while (tm.timeout <= time);
64 			} else tm.pending = false;
65 			m_firedTimers.put(tm);
66 		}
67 
68 		auto processed_timers = m_firedTimers.consume();
69 
70 		foreach (tm; processed_timers) {
71 			m_timerQueue.remove(tm);
72 			if (tm.repeatDuration > Duration.zero)
73 				enqueueTimer(tm);
74 		}
75 
76 		foreach (tm; processed_timers) {
77 			auto cb = tm.callback;
78 			tm.callback = null;
79 			if (cb) {
80 				cb(tm.id, true);
81 				releaseRef(tm.id);
82 			}
83 		}
84 
85 		return processed_timers.length > 0;
86 	}
87 
88 	final override TimerID create()
89 	@trusted {
90 		auto id = TimerID(++m_lastTimerID, 0);
91 		TimerSlot* tm;
92 		try tm = ms_allocator.make!TimerSlot;
93 		catch (Exception e) return TimerID.invalid;
94 		assert(tm !is null);
95 		GC.addRange(tm, TimerSlot.sizeof, typeid(TimerSlot));
96 		tm.id = id;
97 		tm.refCount = 1;
98 		tm.timeout = MonoTime.max;
99 		m_timers[id] = tm;
100 		return id;
101 	}
102 
103 	final override void set(TimerID timer, Duration timeout, Duration repeat)
104 	@trusted {
105 		if (!isValid(timer)) return;
106 
107 		scope (failure) assert(false);
108 		auto tm = m_timers[timer];
109 		if (tm.pending) stop(timer);
110 		tm.timeout = MonoTime.currTime + timeout;
111 		tm.repeatDuration = repeat;
112 		tm.pending = true;
113 		enqueueTimer(tm);
114 	}
115 
116 	final override void stop(TimerID timer)
117 	@trusted {
118 		import std.algorithm.mutation : swap;
119 
120 		if (!isValid(timer)) return;
121 
122 		auto tm = m_timers[timer];
123 		if (!tm.pending) return;
124 		TimerCallback2 cb;
125 		swap(cb, tm.callback);
126 		if (cb) {
127 			cb(timer, false);
128 			releaseRef(timer);
129 		}
130 		tm.pending = false;
131 		m_timerQueue.remove(tm);
132 	}
133 
134 	final override bool isPending(TimerID descriptor)
135 	{
136 		if (!isValid(descriptor)) return false;
137 
138 		return m_timers[descriptor].pending;
139 	}
140 
141 	final override bool isPeriodic(TimerID descriptor)
142 	{
143 		if (!isValid(descriptor)) return false;
144 
145 		return m_timers[descriptor].repeatDuration > Duration.zero;
146 	}
147 
148 	final override void wait(TimerID timer, TimerCallback2 callback)
149 	{
150 		if (!isValid(timer)) return;
151 
152 		assert(!m_timers[timer].callback, "Calling wait() on a timer that is already waiting.");
153 		m_timers[timer].callback = callback;
154 		addRef(timer);
155 	}
156 	alias wait = EventDriverTimers.wait;
157 
158 	final override void cancelWait(TimerID timer)
159 	{
160 		if (!isValid(timer)) return;
161 
162 		auto pt = m_timers[timer];
163 		assert(pt.callback);
164 		pt.callback = null;
165 		releaseRef(timer);
166 	}
167 
168 	override bool isValid(TimerID handle)
169 	const {
170 		return (handle in m_timers) !is null;
171 	}
172 
173 	final override void addRef(TimerID descriptor)
174 	{
175 		if (!isValid(descriptor)) return;
176 
177 		m_timers[descriptor].refCount++;
178 	}
179 
180 	final override bool releaseRef(TimerID descriptor)
181 	{
182 		if (!isValid(descriptor)) return true;
183 
184 		auto tm = m_timers[descriptor];
185 		tm.refCount--;
186 
187 		// cancel starved timer waits
188 		if (tm.callback && tm.refCount == 1 && !tm.pending) {
189 			debug addRef(descriptor);
190 			cancelWait(tm.id);
191 			debug {
192 				assert(tm.refCount == 1);
193 				releaseRef(descriptor);
194 			}
195 			return false;
196 		}
197 
198 		if (!tm.refCount) {
199 			if (tm.pending) stop(tm.id);
200 			m_timers.remove(descriptor);
201 			() @trusted {
202 				scope (failure) assert(false);
203 				ms_allocator.dispose(tm);
204 				GC.removeRange(tm);
205 			} ();
206 
207 			return false;
208 		}
209 
210 		return true;
211 	}
212 
213 	final bool isUnique(TimerID descriptor)
214 	const {
215 		if (!isValid(descriptor)) return false;
216 
217 		return m_timers[descriptor].refCount == 1;
218 	}
219 
220 	protected final override void* rawUserData(TimerID descriptor, size_t size, DataInitializer initialize, DataInitializer destroy)
221 	@system {
222 		if (!isValid(descriptor)) return null;
223 
224 		TimerSlot* fds = m_timers[descriptor];
225 		assert(fds.userDataDestructor is null || fds.userDataDestructor is destroy,
226 			"Requesting user data with differing type (destructor).");
227 		assert(size <= TimerSlot.userData.length, "Requested user data is too large.");
228 		if (size > TimerSlot.userData.length) assert(false);
229 		if (!fds.userDataDestructor) {
230 			initialize(fds.userData.ptr);
231 			fds.userDataDestructor = destroy;
232 		}
233 		return fds.userData.ptr;
234 	}
235 
236 	private void enqueueTimer(TimerSlot* tm)
237 	nothrow {
238 		TimerSlot* ns;
239 		foreach_reverse (t; m_timerQueue[])
240 			if (t.timeout <= tm.timeout) {
241 				ns = t;
242 				break;
243 			}
244 
245 		if (ns) m_timerQueue.insertAfter(tm, ns);
246 		else m_timerQueue.insertFront(tm);
247 	}
248 }
249 
250 struct TimerSlot {
251 	TimerSlot* prev, next;
252 	TimerID id;
253 	uint refCount;
254 	bool pending;
255 	MonoTime timeout;
256 	Duration repeatDuration;
257 	TimerCallback2 callback; // TODO: use a list with small-value optimization
258 
259 	DataInitializer userDataDestructor;
260 	ubyte[16*size_t.sizeof] userData;
261 }