1 module eventcore.internal.consumablequeue; 2 3 import eventcore.internal.utils : mallocNT, freeNT; 4 5 6 /** FIFO queue with support for chunk-wise consumption. 7 */ 8 final class ConsumableQueue(T) 9 { 10 @safe nothrow: 11 12 private { 13 struct Slot { 14 T value; 15 uint rc; 16 } 17 Slot[] m_storage; 18 size_t m_capacityMask; 19 size_t m_first; 20 size_t m_consumedCount; 21 size_t m_pendingCount; 22 } 23 24 ~this() 25 @trusted @nogc nothrow { 26 if (m_storage !is null) 27 freeNT(m_storage); 28 } 29 30 @property size_t length() const { return m_pendingCount; } 31 32 @property bool empty() const { return length == 0; } 33 34 /** Inserts a single element into the queue. 35 */ 36 @safe void put(T element) 37 { 38 reserve(1); 39 auto idx = (m_first + m_consumedCount + m_pendingCount++) & m_capacityMask; 40 m_storage[idx] = Slot(element, 0); 41 } 42 43 /** Reserves space for inserting at least `count` elements. 44 */ 45 void reserve(size_t count) 46 @safe { 47 auto min_capacity = m_consumedCount + m_pendingCount + count; 48 if (min_capacity <= m_storage.length) 49 return; 50 51 auto new_capacity = m_storage.length ? m_storage.length : 16; 52 while (new_capacity < min_capacity) new_capacity *= 2; 53 auto new_capacity_mask = new_capacity - 1; 54 55 auto new_storage = mallocNT!Slot(new_capacity); 56 foreach (i; 0 .. m_consumedCount + m_pendingCount) 57 new_storage[(m_first + i) & new_capacity_mask] = m_storage[(m_first + i) & m_capacityMask]; 58 59 () @trusted { 60 if (m_storage !is null) 61 freeNT(m_storage); 62 m_storage = new_storage; 63 } (); 64 m_capacityMask = new_capacity_mask; 65 } 66 67 void removePending(T item) 68 { 69 foreach (i; 0 .. m_pendingCount) 70 if (getPendingAt(i) == item) { 71 if (m_pendingCount > 1) 72 getPendingAt(i) = getPendingAt(m_pendingCount-1); 73 m_pendingCount--; 74 break; 75 } 76 } 77 78 /** Consumes all elements of the queue and returns a range containing the 79 consumed elements. 80 81 Any elements added after the call to `consume` will not show up in the 82 returned range. 83 */ 84 ConsumedRange consume() 85 @safe { 86 if (!m_pendingCount) return ConsumedRange(null, 0, 0); 87 auto first = (m_first + m_consumedCount) % m_storage.length; 88 auto count = m_pendingCount; 89 m_consumedCount += count; 90 m_pendingCount = 0; 91 return ConsumedRange(this, first, count); 92 } 93 94 T consumeOne() 95 { 96 assert(!empty); 97 auto ret = m_storage[(m_first + m_consumedCount) & m_capacityMask].value; 98 if (m_consumedCount) m_consumedCount++; 99 else m_first = (m_first + 1) & m_capacityMask; 100 m_pendingCount--; 101 return ret; 102 } 103 104 static struct ConsumedRange { 105 nothrow: 106 107 private { 108 ConsumableQueue m_queue; 109 size_t m_first; 110 size_t m_count; 111 } 112 113 this(ConsumableQueue queue, size_t first, size_t count) 114 { 115 if (count) { 116 m_queue = queue; 117 m_first = first; 118 m_count = count; 119 m_queue.m_storage[first & m_queue.m_capacityMask].rc++; 120 } 121 } 122 123 this(this) 124 { 125 if (m_count) 126 m_queue.m_storage[m_first & m_queue.m_capacityMask].rc++; 127 } 128 129 ~this() 130 { 131 if (m_count) 132 m_queue.consumed(m_first, false); 133 } 134 135 @property ConsumedRange save() { return this; } 136 137 @property bool empty() const { return m_count == 0; } 138 139 @property size_t length() const { return m_count; } 140 141 @property ref inout(T) front() inout { return m_queue.m_storage[m_first & m_queue.m_capacityMask].value; } 142 143 void popFront() 144 { 145 m_queue.consumed(m_first, m_count > 1); 146 m_first++; 147 m_count--; 148 } 149 150 ref inout(T) opIndex(size_t idx) inout { return m_queue.m_storage[(m_first + idx) & m_queue.m_capacityMask].value; } 151 152 int opApply(scope int delegate(ref T) @safe nothrow del) 153 { 154 foreach (i; 0 .. m_count) 155 if (auto ret = del(m_queue.m_storage[(m_first + i) & m_queue.m_capacityMask].value)) 156 return ret; 157 return 0; 158 } 159 } 160 161 private void consumed(size_t first, bool shift_up) 162 { 163 if (shift_up) { 164 m_storage[(first+1) & m_capacityMask].rc++; 165 if (!--m_storage[first & m_capacityMask].rc && first == m_first) { 166 m_first++; 167 m_consumedCount--; 168 } 169 } else { 170 m_storage[first & m_capacityMask].rc--; 171 if (first == m_first) 172 while (m_consumedCount > 0 && !m_storage[m_first & m_capacityMask].rc) { 173 m_first++; 174 m_consumedCount--; 175 } 176 } 177 m_first = m_first & m_capacityMask; 178 } 179 180 private ref T getPendingAt(size_t idx) 181 { 182 assert(idx < m_pendingCount, "Pending item index out of bounds."); 183 return m_storage[(m_first + m_consumedCount + idx) & m_capacityMask].value; 184 } 185 } 186 187 /// 188 unittest { 189 import std.algorithm.comparison : equal; 190 auto q = new ConsumableQueue!int; 191 192 q.put(1); 193 q.put(2); 194 q.put(3); 195 assert(q.m_consumedCount == 0 && q.m_pendingCount == 3); 196 197 auto r1 = q.consume; 198 assert(r1.length == 3); 199 assert(q.m_consumedCount == 3 && q.m_pendingCount == 0); 200 201 q.put(4); 202 q.put(5); 203 assert(q.m_consumedCount == 3 && q.m_pendingCount == 2); 204 205 auto r2 = q.consume; 206 assert(r2.length == 2); 207 assert(q.m_consumedCount == 5 && q.m_pendingCount == 0); 208 209 q.put(6); 210 assert(q.m_consumedCount == 5 && q.m_pendingCount == 1); 211 212 auto r3 = r1.save; 213 assert(r3.length == 3); 214 215 assert(q.m_consumedCount == 5 && q.m_pendingCount == 1); 216 assert((&r2).equal([4, 5])); 217 assert(q.m_consumedCount == 5 && q.m_pendingCount == 1); 218 assert((&r1).equal([1, 2, 3])); 219 assert(q.m_consumedCount == 5 && q.m_pendingCount == 1); 220 assert((&r3).equal([1, 2, 3])); 221 assert(q.m_consumedCount == 0 && q.m_pendingCount == 1); 222 223 assert(q.length == 1); 224 assert(q.consumeOne == 6); 225 assert(q.length == 0); 226 assert(q.m_consumedCount == 0); 227 228 foreach (i; 7 .. 15) q.put(i); 229 assert(q.consume.equal([7, 8, 9, 10, 11, 12, 13, 14])); 230 q.put(15); 231 assert(q.consume.equal([15])); 232 q.put(16); 233 assert(q.consume.equal([16])); 234 q.put(17); 235 assert(q.consume.equal([17])); 236 assert(q.consume.empty); 237 } 238 239 unittest { 240 import std.range : iota; 241 import std.algorithm.comparison : equal; 242 243 auto q = new ConsumableQueue!int; 244 foreach (i; 0 .. 14) 245 q.put(i); 246 assert(q.consume().equal(iota(14))); 247 foreach (i; 0 .. 4) 248 q.put(i); 249 assert(q.consume().equal(iota(4))); 250 } 251 252 253 void filterPending(alias pred, T)(ConsumableQueue!T q) 254 { 255 size_t ir = 0; 256 size_t iw = 0; 257 258 while (ir < q.m_pendingCount) { 259 if (!pred(q.getPendingAt(ir))) { 260 } else { 261 if (ir != iw) q.getPendingAt(iw) = q.getPendingAt(ir); 262 iw++; 263 } 264 ir++; 265 } 266 q.m_pendingCount = iw; 267 } 268 269 270 unittest { 271 import std.algorithm.comparison : equal; 272 import std.range : only; 273 274 auto q = new ConsumableQueue!int; 275 foreach (i; 0 .. 14) q.put(i); 276 q.filterPending!(i => i % 2 != 0); 277 assert(q.consume().equal(only(1, 3, 5, 7, 9, 11, 13))); 278 279 foreach (i; 0 .. 14) q.put(i); 280 q.filterPending!(i => i % 3 == 1); 281 assert(q.consume().equal(only(1, 4, 7, 10, 13))); 282 }