1 module eventcore.internal.consumablequeue;
2 
3 import eventcore.internal.utils : mallocNT, freeNT;
4 
5 
6 /** FIFO queue with support for chunk-wise consumption.
7 */
8 final class ConsumableQueue(T)
9 {
10 	@safe nothrow:
11 
12 	private {
13 		struct Slot {
14 			T value;
15 			uint rc;
16 		}
17 		Slot[] m_storage;
18 		size_t m_capacityMask;
19 		size_t m_first;
20 		size_t m_consumedCount;
21 		size_t m_pendingCount;
22 	}
23 
24 	~this()
25 	@trusted @nogc nothrow {
26 		if (m_storage !is null)
27 			freeNT(m_storage);
28 	}
29 
30 	@property size_t length() const { return m_pendingCount; }
31 
32 	@property bool empty() const { return length == 0; }
33 
34 	/** Inserts a single element into the queue.
35 	*/
36 	@safe void put(T element)
37 	{
38 		reserve(1);
39 		auto idx = (m_first + m_consumedCount + m_pendingCount++) & m_capacityMask;
40 		m_storage[idx] = Slot(element, 0);
41 	}
42 
43 	/** Reserves space for inserting at least `count` elements.
44 	*/
45 	void reserve(size_t count)
46 	@safe {
47 		auto min_capacity = m_consumedCount + m_pendingCount + count;
48 		if (min_capacity <= m_storage.length)
49 			return;
50 
51 		auto new_capacity = m_storage.length ? m_storage.length : 16;
52 		while (new_capacity < min_capacity) new_capacity *= 2;
53 		auto new_capacity_mask = new_capacity - 1;
54 
55 		auto new_storage = mallocNT!Slot(new_capacity);
56 		foreach (i; 0 .. m_consumedCount + m_pendingCount)
57 			new_storage[(m_first + i) & new_capacity_mask] = m_storage[(m_first + i) & m_capacityMask];
58 
59 		() @trusted {
60 			if (m_storage !is null)
61 				freeNT(m_storage);
62 			m_storage = new_storage;
63 		} ();
64 		m_capacityMask = new_capacity_mask;
65 	}
66 
67 	void removePending(T item)
68 	{
69 		foreach (i; 0 .. m_pendingCount)
70 			if (getPendingAt(i) == item) {
71 				if (m_pendingCount > 1)
72 					getPendingAt(i) = getPendingAt(m_pendingCount-1);
73 				m_pendingCount--;
74 				break;
75 			}
76 	}
77 
78 	/** Consumes all elements of the queue and returns a range containing the
79 		consumed elements.
80 
81 		Any elements added after the call to `consume` will not show up in the
82 		returned range.
83 	*/
84 	ConsumedRange consume()
85 	@safe {
86 		if (!m_pendingCount) return ConsumedRange(null, 0, 0);
87 		auto first = (m_first + m_consumedCount) % m_storage.length;
88 		auto count = m_pendingCount;
89 		m_consumedCount += count;
90 		m_pendingCount = 0;
91 		return ConsumedRange(this, first, count);
92 	}
93 
94 	T consumeOne()
95 	{
96 		assert(!empty);
97 		auto ret = m_storage[(m_first + m_consumedCount) & m_capacityMask].value;
98 		if (m_consumedCount) m_consumedCount++;
99 		else m_first = (m_first + 1) & m_capacityMask;
100 		m_pendingCount--;
101 		return ret;
102 	}
103 
104 	static struct ConsumedRange {
105 		nothrow:
106 
107 		private {
108 			ConsumableQueue m_queue;
109 			size_t m_first;
110 			size_t m_count;
111 		}
112 
113 		this(ConsumableQueue queue, size_t first, size_t count)
114 		{
115 			if (count) {
116 				m_queue = queue;
117 				m_first = first;
118 				m_count = count;
119 				m_queue.m_storage[first & m_queue.m_capacityMask].rc++;
120 			}
121 		}
122 
123 		this(this)
124 		{
125 			if (m_count)
126 				m_queue.m_storage[m_first & m_queue.m_capacityMask].rc++;
127 		}
128 
129 		~this()
130 		{
131 			if (m_count)
132 				m_queue.consumed(m_first, false);
133 		}
134 
135 		@property ConsumedRange save() { return this; }
136 
137 		@property bool empty() const { return m_count == 0; }
138 
139 		@property size_t length() const { return m_count; }
140 
141 		@property ref inout(T) front() inout { return m_queue.m_storage[m_first & m_queue.m_capacityMask].value; }
142 
143 		void popFront()
144 		{
145 			m_queue.consumed(m_first, m_count > 1);
146 			m_first++;
147 			m_count--;
148 		}
149 
150 		ref inout(T) opIndex(size_t idx) inout { return m_queue.m_storage[(m_first + idx) & m_queue.m_capacityMask].value; }
151 
152 		int opApply(scope int delegate(ref T) @safe nothrow del)
153 		{
154 			foreach (i; 0 .. m_count)
155 				if (auto ret = del(m_queue.m_storage[(m_first + i) & m_queue.m_capacityMask].value))
156 					return ret;
157 			return 0;
158 		}
159 	}
160 
161 	private void consumed(size_t first, bool shift_up)
162 	{
163 		if (shift_up) {
164 			m_storage[(first+1) & m_capacityMask].rc++;
165 			if (!--m_storage[first & m_capacityMask].rc && first == m_first) {
166 				m_first++;
167 				m_consumedCount--;
168 			}
169 		} else {
170 			m_storage[first & m_capacityMask].rc--;
171 			if (first == m_first)
172 				while (m_consumedCount > 0 && !m_storage[m_first & m_capacityMask].rc) {
173 					m_first++;
174 					m_consumedCount--;
175 				}
176 		}
177 		m_first = m_first & m_capacityMask;
178 	}
179 
180 	private ref T getPendingAt(size_t idx)
181 	{
182 		assert(idx < m_pendingCount, "Pending item index out of bounds.");
183 		return m_storage[(m_first + m_consumedCount + idx) & m_capacityMask].value;
184 	}
185 }
186 
187 ///
188 unittest {
189 	import std.algorithm.comparison : equal;
190 	auto q = new ConsumableQueue!int;
191 
192 	q.put(1);
193 	q.put(2);
194 	q.put(3);
195 	assert(q.m_consumedCount == 0 && q.m_pendingCount == 3);
196 
197 	auto r1 = q.consume;
198 	assert(r1.length == 3);
199 	assert(q.m_consumedCount == 3 && q.m_pendingCount == 0);
200 
201 	q.put(4);
202 	q.put(5);
203 	assert(q.m_consumedCount == 3 && q.m_pendingCount == 2);
204 
205 	auto r2 = q.consume;
206 	assert(r2.length == 2);
207 	assert(q.m_consumedCount == 5 && q.m_pendingCount == 0);
208 
209 	q.put(6);
210 	assert(q.m_consumedCount == 5 && q.m_pendingCount == 1);
211 
212 	auto r3 = r1.save;
213 	assert(r3.length == 3);
214 
215 	assert(q.m_consumedCount == 5 && q.m_pendingCount == 1);
216 	assert((&r2).equal([4, 5]));
217 	assert(q.m_consumedCount == 5 && q.m_pendingCount == 1);
218 	assert((&r1).equal([1, 2, 3]));
219 	assert(q.m_consumedCount == 5 && q.m_pendingCount == 1);
220 	assert((&r3).equal([1, 2, 3]));
221 	assert(q.m_consumedCount == 0 && q.m_pendingCount == 1);
222 
223 	assert(q.length == 1);
224 	assert(q.consumeOne == 6);
225 	assert(q.length == 0);
226 	assert(q.m_consumedCount == 0);
227 
228 	foreach (i; 7 .. 15) q.put(i);
229 	assert(q.consume.equal([7, 8, 9, 10, 11, 12, 13, 14]));
230 	q.put(15);
231 	assert(q.consume.equal([15]));
232 	q.put(16);
233 	assert(q.consume.equal([16]));
234 	q.put(17);
235 	assert(q.consume.equal([17]));
236 	assert(q.consume.empty);
237 }
238 
239 unittest {
240 	import std.range : iota;
241 	import std.algorithm.comparison : equal;
242 
243 	auto q = new ConsumableQueue!int;
244 	foreach (i; 0 .. 14)
245 		q.put(i);
246 	assert(q.consume().equal(iota(14)));
247 	foreach (i; 0 .. 4)
248 		q.put(i);
249 	assert(q.consume().equal(iota(4)));
250 }
251 
252 
253 void filterPending(alias pred, T)(ConsumableQueue!T q)
254 {
255 	size_t ir = 0;
256 	size_t iw = 0;
257 
258 	while (ir < q.m_pendingCount) {
259 		if (!pred(q.getPendingAt(ir))) {
260 		} else {
261 			if (ir != iw) q.getPendingAt(iw) = q.getPendingAt(ir);
262 			iw++;
263 		}
264 		ir++;
265 	}
266 	q.m_pendingCount = iw;
267 }
268 
269 
270 unittest {
271 	import std.algorithm.comparison : equal;
272 	import std.range : only;
273 
274 	auto q = new ConsumableQueue!int;
275 	foreach (i; 0 .. 14) q.put(i);
276 	q.filterPending!(i => i % 2 != 0);
277 	assert(q.consume().equal(only(1, 3, 5, 7, 9, 11, 13)));
278 
279 	foreach (i; 0 .. 14) q.put(i);
280 	q.filterPending!(i => i % 3 == 1);
281 	assert(q.consume().equal(only(1, 4, 7, 10, 13)));
282 }