1 module eventcore.drivers.posix.events;
2 @safe:
3 
4 import eventcore.driver;
5 import eventcore.drivers.posix.driver;
6 import eventcore.internal.consumablequeue : ConsumableQueue;
7 import eventcore.internal.utils : nogc_assert, mallocT, freeT;
8 
9 
10 version (linux) {
11 	nothrow @nogc extern (C) int eventfd(uint initval, int flags);
12     import core.sys.linux.sys.eventfd : EFD_NONBLOCK, EFD_CLOEXEC;
13 }
14 version (Posix) {
15 	import core.sys.posix.unistd : close, read, write;
16 } else {
17 	import core.sys.windows.winsock2 : closesocket, AF_INET, SOCKET, SOCK_DGRAM,
18 		bind, connect, getsockname, send, socket;
19 }
20 
21 
22 final class PosixEventDriverEvents(Loop : PosixEventLoop, Sockets : EventDriverSockets) : EventDriverEvents {
23 @safe: /*@nogc:*/ nothrow:
24 	private {
25 		Loop m_loop;
26 		Sockets m_sockets;
27 		ubyte[ulong.sizeof] m_buf;
28 	}
29 
30 	this(Loop loop, Sockets sockets)
31 	@nogc {
32 		m_loop = loop;
33 		m_sockets = sockets;
34 	}
35 
36 	package @property Loop loop() { return m_loop; }
37 
38 	final override EventID create()
39 	{
40 		return createInternal(false);
41 	}
42 
43 	package(eventcore) EventID createInternal(bool is_internal = true)
44 	@nogc {
45 		version (linux) {
46 			auto eid = eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);
47 			if (eid == -1) return EventID.invalid;
48 			// FIXME: avoid dynamic memory allocation for the queue
49 			auto id = m_loop.initFD!EventID(eid, FDFlags.internal,
50 				EventSlot(mallocT!(ConsumableQueue!EventCallback), is_internal));
51 			m_loop.registerFD(id, EventMask.read);
52 			m_loop.setNotifyCallback!(EventType.read)(id, &onEvent);
53 			releaseRef(id); // setNotifyCallback increments the reference count, but we need a value of 1 upon return
54 			assert(getRC(id) == 1);
55 			return id;
56 		} else {
57 			sock_t[2] fd;
58 			version (Posix) {
59 				import core.sys.posix.fcntl : fcntl, F_SETFL;
60 				import eventcore.drivers.posix.sockets : O_CLOEXEC;
61 
62 				// create a pair of sockets to communicate between threads
63 				import core.sys.posix.sys.socket : SOCK_DGRAM, AF_UNIX, socketpair;
64 				if (() @trusted { return socketpair(AF_UNIX, SOCK_DGRAM, 0, fd); } () != 0)
65 					return EventID.invalid;
66 
67 				assert(fd[0] != fd[1]);
68 
69 				// use the first socket as the async receiver
70 				auto s = m_sockets.adoptDatagramSocketInternal(fd[0], true, true);
71 
72 				() @trusted { fcntl(fd[1], F_SETFL, O_CLOEXEC); } ();
73 			} else {
74 				// fake missing socketpair support on Windows
75 				import std.socket : InternetAddress;
76 				scope addr = new InternetAddress(0x7F000001, 0);
77 				auto s = m_sockets.createDatagramSocketInternal(addr, null, DatagramCreateOptions.none, true);
78 				if (s == DatagramSocketFD.invalid) return EventID.invalid;
79 				fd[0] = cast(sock_t)s;
80 				if (!() @trusted {
81 					fd[1] = socket(AF_INET, SOCK_DGRAM, 0);
82 					int nl = addr.nameLen;
83 					import eventcore.internal.utils : print;
84 					if (bind(fd[1], addr.name, addr.nameLen) != 0)
85 						return false;
86 					assert(nl == addr.nameLen);
87 					if (getsockname(fd[0], addr.name, &nl) != 0)
88 						return false;
89 					if (connect(fd[1], addr.name, addr.nameLen) != 0)
90 						return false;
91 					return true;
92 				} ())
93 				{
94 					m_sockets.releaseRef(s);
95 					return EventID.invalid;
96 				}
97 			}
98 
99 			m_sockets.receiveNoGC(s, m_buf, IOMode.once, &onSocketData);
100 
101 			// use the second socket as the event ID and as the sending end for
102 			// other threads
103 			// FIXME: avoid dynamic memory allocation for the queue
104 			auto id = m_loop.initFD!EventID(fd[1], FDFlags.internal,
105 				EventSlot(mallocT!(ConsumableQueue!EventCallback), is_internal, s));
106 			assert(getRC(id) == 1);
107 			try m_sockets.userData!EventID(s) = id;
108 			catch (Exception e) assert(false, e.msg);
109 			return id;
110 		}
111 	}
112 
113 	final override void trigger(EventID event, bool notify_all)
114 	{
115 		if (!isValid(event)) return;
116 
117 		// make sure the event stays alive until all waiters have been notified.
118 		addRef(event);
119 		scope (exit) releaseRef(event);
120 
121 		auto slot = getSlot(event);
122 		if (notify_all) {
123 			//log("emitting only for this thread (%s waiters)", m_fds[event].waiters.length);
124 			foreach (w; slot.waiters.consume) {
125 				//log("emitting waiter %s %s", cast(void*)w.funcptr, w.ptr);
126 				if (!isInternal(event)) m_loop.m_waiterCount--;
127 				w(event);
128 			}
129 		} else {
130 			if (!slot.waiters.empty) {
131 				if (!isInternal(event)) m_loop.m_waiterCount--;
132 				slot.waiters.consumeOne()(event);
133 			}
134 		}
135 	}
136 
137 	final override void trigger(EventID event, bool notify_all)
138 	shared @trusted @nogc {
139 		import core.atomic : atomicStore;
140 		long count = notify_all ? long.max : 1;
141 		//log("emitting for all threads");
142 		version (Posix) .write(cast(int)event, &count, count.sizeof);
143 		else assert(send(cast(int)event, cast(const(ubyte*))&count, count.sizeof, 0) == count.sizeof);
144 	}
145 
146 	final override void wait(EventID event, EventCallback on_event)
147 	@nogc {
148 		if (!isValid(event)) return;
149 
150 		if (!isInternal(event)) m_loop.m_waiterCount++;
151 		getSlot(event).waiters.put(on_event);
152 	}
153 
154 	final override void cancelWait(EventID event, EventCallback on_event)
155 	{
156 		import std.algorithm.searching : countUntil;
157 		import std.algorithm.mutation : remove;
158 
159 		if (!isValid(event)) return;
160 
161 		if (!isInternal(event)) m_loop.m_waiterCount--;
162 		getSlot(event).waiters.removePending(on_event);
163 	}
164 
165 	version (linux) {
166 		private void onEvent(FD fd)
167 		@trusted {
168 			EventID event = cast(EventID)fd;
169 			ulong cnt;
170 			() @trusted { .read(cast(int)event, &cnt, cnt.sizeof); } ();
171 			trigger(event, cnt > 0);
172 		}
173 	} else {
174 		private void onSocketData(DatagramSocketFD s, IOStatus st, size_t, scope RefAddress)
175 		@nogc {
176 			// avoid infinite recursion in case of errors
177 			if (st == IOStatus.ok)
178 				m_sockets.receiveNoGC(s, m_buf, IOMode.once, &onSocketData);
179 
180 			try {
181 				EventID evt = m_sockets.userData!EventID(s);
182 				scope doit = {
183 					trigger(evt, (cast(long[])m_buf)[0] > 1);
184 				}; // cast to nogc
185 				() @trusted { (cast(void delegate() @nogc)doit)(); } ();
186 			} catch (Exception e) assert(false, e.msg);
187 		}
188 	}
189 
190 	override bool isValid(EventID handle)
191 	const {
192 		if (handle.value >= m_loop.m_fds.length) return false;
193 		return m_loop.m_fds[handle.value].common.validationCounter == handle.validationCounter;
194 	}
195 
196 	final override void addRef(EventID descriptor)
197 	{
198 		if (!isValid(descriptor)) return;
199 
200 		assert(getRC(descriptor) > 0, "Adding reference to unreferenced event FD.");
201 		getRC(descriptor)++;
202 	}
203 
204 	final override bool releaseRef(EventID descriptor)
205 	@nogc {
206 		if (!isValid(descriptor)) return true;
207 
208 		nogc_assert(getRC(descriptor) > 0, "Releasing reference to unreferenced event FD.");
209 		if (--getRC(descriptor) == 0) {
210 			if (!isInternal(descriptor))
211 		 		m_loop.m_waiterCount -= getSlot(descriptor).waiters.length;
212 			() @trusted nothrow {
213 				try freeT(getSlot(descriptor).waiters);
214 				catch (Exception e) nogc_assert(false, e.msg);
215 			} ();
216 			version (linux) {
217 				m_loop.unregisterFD(descriptor, EventMask.read);
218 			} else {
219 				auto rs = getSlot(descriptor).recvSocket;
220 				m_sockets.cancelReceive(rs);
221 				m_sockets.releaseRef(rs);
222 			}
223 			m_loop.clearFD!EventSlot(descriptor);
224 			version (Posix) close(cast(int)descriptor);
225 			else () @trusted { closesocket(cast(SOCKET)descriptor); } ();
226 			return false;
227 		}
228 		return true;
229 	}
230 
231 	final protected override void* rawUserData(EventID descriptor, size_t size, DataInitializer initialize, DataInitializer destroy)
232 	@system {
233 		if (!isValid(descriptor)) return null;
234 		return m_loop.rawUserDataImpl(descriptor, size, initialize, destroy);
235 	}
236 
237 	private EventSlot* getSlot(EventID id)
238 	@nogc {
239 		nogc_assert(id < m_loop.m_fds.length, "Invalid event ID.");
240 		return () @trusted @nogc { return &m_loop.m_fds[id].event(); } ();
241 	}
242 
243 	private ref uint getRC(EventID id)
244 	{
245 		return m_loop.m_fds[id].common.refCount;
246 	}
247 
248 	private bool isInternal(EventID id)
249 	@nogc {
250 		return getSlot(id).isInternal;
251 	}
252 }
253 
254 package struct EventSlot {
255 	alias Handle = EventID;
256 	ConsumableQueue!EventCallback waiters;
257 	bool isInternal;
258 	version (linux) {}
259 	else {
260 		DatagramSocketFD recvSocket;
261 	}
262 }