1 /**
2 	Base class for BSD socket based driver implementations.
3 
4 	See_also: `eventcore.drivers.select`, `eventcore.drivers.epoll`, `eventcore.drivers.kqueue`
5 */
6 module eventcore.drivers.posix.driver;
7 @safe: /*@nogc:*/ nothrow:
8 
9 public import eventcore.driver;
10 import eventcore.drivers.posix.dns;
11 import eventcore.drivers.posix.events;
12 import eventcore.drivers.posix.signals;
13 import eventcore.drivers.posix.sockets;
14 import eventcore.drivers.posix.watchers;
15 import eventcore.drivers.posix.processes;
16 import eventcore.drivers.posix.pipes;
17 import eventcore.drivers.timer;
18 import eventcore.drivers.threadedfile;
19 import eventcore.internal.consumablequeue : ConsumableQueue;
20 import eventcore.internal.utils;
21 
22 import core.time : MonoTime;
23 import std.algorithm.comparison : among, min, max;
24 import std.format : format;
25 
26 version (Posix) {
27 	package alias sock_t = int;
28 }
29 version (Windows) {
30 	package alias sock_t = size_t;
31 }
32 
33 private long currStdTime()
34 {
35 	import std.datetime : Clock;
36 	scope (failure) assert(false);
37 	return Clock.currStdTime;
38 }
39 
40 final class PosixEventDriver(Loop : PosixEventLoop) : EventDriver {
41 @safe: /*@nogc:*/ nothrow:
42 
43 
44 	private {
45 		alias CoreDriver = PosixEventDriverCore!(Loop, TimerDriver, EventsDriver, ProcessDriver);
46 		alias EventsDriver = PosixEventDriverEvents!(Loop, SocketsDriver);
47 		version (linux) alias SignalsDriver = SignalFDEventDriverSignals!Loop;
48 		else alias SignalsDriver = DummyEventDriverSignals!Loop;
49 		alias TimerDriver = LoopTimeoutTimerDriver;
50 		alias SocketsDriver = PosixEventDriverSockets!Loop;
51 		version (Windows) alias DNSDriver = EventDriverDNS_GHBN!(EventsDriver, SignalsDriver);
52 		else version (EventcoreUseGAIA) alias DNSDriver = EventDriverDNS_GAIA!(EventsDriver, SignalsDriver);
53 		else alias DNSDriver = EventDriverDNS_GAI!(EventsDriver, SignalsDriver);
54 		alias FileDriver = ThreadedFileEventDriver!EventsDriver;
55 		version (Posix) alias PipeDriver = PosixEventDriverPipes!Loop;
56 		else alias PipeDriver = DummyEventDriverPipes!Loop;
57 		version (linux) alias WatcherDriver = InotifyEventDriverWatchers!EventsDriver;
58 		else version (EventcoreCFRunLoopDriver) alias WatcherDriver = FSEventsEventDriverWatchers!EventsDriver;
59 		else alias WatcherDriver = PollEventDriverWatchers!EventsDriver;
60 		version (Posix) alias ProcessDriver = PosixEventDriverProcesses!Loop;
61 		else alias ProcessDriver = DummyEventDriverProcesses!Loop;
62 
63 		Loop m_loop;
64 		CoreDriver m_core;
65 		EventsDriver m_events;
66 		SignalsDriver m_signals;
67 		LoopTimeoutTimerDriver m_timers;
68 		SocketsDriver m_sockets;
69 		DNSDriver m_dns;
70 		FileDriver m_files;
71 		PipeDriver m_pipes;
72 		WatcherDriver m_watchers;
73 		ProcessDriver m_processes;
74 	}
75 
76 	this()
77 	@nogc @trusted {
78 		m_loop = mallocT!Loop;
79 		m_sockets = mallocT!SocketsDriver(m_loop);
80 		m_events = mallocT!EventsDriver(m_loop, m_sockets);
81 		m_signals = mallocT!SignalsDriver(m_loop);
82 		m_timers = mallocT!TimerDriver;
83 		m_pipes = mallocT!PipeDriver(m_loop);
84 		m_processes = mallocT!ProcessDriver(m_loop, this);
85 		m_core = mallocT!CoreDriver(m_loop, m_timers, m_events, m_processes);
86 		m_dns = mallocT!DNSDriver(m_events, m_signals);
87 		m_files = mallocT!FileDriver(m_events);
88 		m_watchers = mallocT!WatcherDriver(m_events);
89 	}
90 
91 	// force overriding these in the (final) sub classes to avoid virtual calls
92 	final override @property inout(CoreDriver) core() inout { return m_core; }
93 	final override @property shared(inout(CoreDriver)) core() shared inout { return m_core; }
94 	final override @property inout(EventsDriver) events() inout { return m_events; }
95 	final override @property shared(inout(EventsDriver)) events() shared inout { return m_events; }
96 	final override @property inout(SignalsDriver) signals() inout { return m_signals; }
97 	final override @property inout(TimerDriver) timers() inout { return m_timers; }
98 	final override @property inout(SocketsDriver) sockets() inout { return m_sockets; }
99 	final override @property inout(DNSDriver) dns() inout { return m_dns; }
100 	final override @property inout(FileDriver) files() inout { return m_files; }
101 	final override @property inout(PipeDriver) pipes() inout { return m_pipes; }
102 	final override @property inout(WatcherDriver) watchers() inout { return m_watchers; }
103 	final override @property inout(ProcessDriver) processes() inout { return m_processes; }
104 
105 	final override bool dispose()
106 	{
107 		import core.thread : Thread;
108 		import taggedalgebraic : hasType;
109 
110 		if (!m_loop) return true;
111 
112 		static string getThreadName()
113 		{
114 			string thname;
115 			try thname = Thread.getThis().name;
116 			catch (Exception e) assert(false, e.msg);
117 			return thname.length ? thname : "unknown";
118 		}
119 
120 		bool hasPrintedHeader;
121 		foreach (id, ref s; m_loop.m_fds) {
122 			if (!s.specific.hasType!(typeof(null)) && !(s.common.flags & FDFlags.internal)
123 				&& (!s.specific.hasType!(StreamSocketSlot) || s.streamSocket.state == ConnectionState.connected))
124 			{
125 				if (!hasPrintedHeader) {
126 					print("Warning (thread: %s): leaking eventcore driver because there are still active handles", getThreadName());
127 					hasPrintedHeader = true;
128 				}
129 				print("  FD %s (%s)", id, s.specific.kind);
130 				debug (EventCoreLeakTrace) {
131 					import std.array : replace;
132 					string origin_str = s.common.origin.toString();
133 					print("    Created by;\n      %s",
134 						origin_str.replace("\n","\n      "));
135 				}
136 			}
137 		}
138 		debug (EventCoreLeakTrace) {}
139 		else {
140 			if (hasPrintedHeader)
141 					print("Use '-debug=EventCoreLeakTrace' to show where the instantiation happened");
142 		}
143 
144 		if (m_loop.m_handleCount > 0)
145 			return false;
146 
147 		m_processes.dispose();
148 		m_files.dispose();
149 		m_dns.dispose();
150 		m_core.dispose();
151 		m_loop.dispose();
152 
153 		try () @trusted {
154 				freeT(m_processes);
155 				freeT(m_watchers);
156 				freeT(m_pipes);
157 				freeT(m_files);
158 				freeT(m_dns);
159 				freeT(m_core);
160 				freeT(m_timers);
161 				freeT(m_signals);
162 				freeT(m_events);
163 				freeT(m_sockets);
164 				freeT(m_loop);
165 			} ();
166 		catch (Exception e) assert(false, e.msg);
167 
168 		return true;
169 	}
170 }
171 
172 
173 final class PosixEventDriverCore(Loop : PosixEventLoop, Timers : EventDriverTimers, Events : EventDriverEvents, Processes : EventDriverProcesses) : EventDriverCore {
174 @safe nothrow:
175 	import core.atomic : atomicLoad, atomicStore;
176 	import core.sync.mutex : Mutex;
177 	import core.time : Duration;
178 	import std.stdint : intptr_t;
179 	import std.typecons : Tuple, tuple;
180 
181 	protected alias ExtraEventsCallback = bool delegate(long);
182 
183 	private alias ThreadCallbackEntry = Tuple!(ThreadCallbackGen, ThreadCallbackGenParams);
184 
185 	private {
186 		Loop m_loop;
187 		Timers m_timers;
188 		Events m_events;
189 		Processes m_processes;
190 		bool m_exit = false;
191 		EventID m_wakeupEvent;
192 
193 		shared Mutex m_threadCallbackMutex;
194 		ConsumableQueue!ThreadCallbackEntry m_threadCallbacks;
195 	}
196 
197 	this(Loop loop, Timers timers, Events events, Processes processes)
198 	@nogc {
199 		m_loop = loop;
200 		m_timers = timers;
201 		m_events = events;
202 		m_processes = processes;
203 		m_wakeupEvent = events.createInternal();
204         m_threadCallbackMutex = mallocT!(shared(Mutex));
205 		m_threadCallbacks = mallocT!(ConsumableQueue!ThreadCallbackEntry);
206 		m_threadCallbacks.reserve(1000);
207 	}
208 
209 	final void dispose()
210 	{
211 		executeThreadCallbacks();
212 		m_events.releaseRef(m_wakeupEvent);
213 		m_wakeupEvent = EventID.invalid; // FIXME: this needs to be synchronized!
214 		try {
215 			() @trusted { freeT(m_threadCallbackMutex); } ();
216 			() @trusted { freeT(m_threadCallbacks); } ();
217 		} catch (Exception e) assert(false, e.msg);
218 	}
219 
220 	@property size_t waiterCount() const { return m_loop.m_waiterCount + m_timers.pendingCount + m_processes.pendingCount; }
221 
222 	final override ExitReason processEvents(Duration timeout)
223 	{
224 		import core.time : hnsecs, seconds;
225 
226 		executeThreadCallbacks();
227 
228 		if (m_exit) {
229 			m_exit = false;
230 			return ExitReason.exited;
231 		}
232 
233 		if (!waiterCount) {
234 			return ExitReason.outOfWaiters;
235 		}
236 
237 		bool got_events;
238 
239 		if (timeout <= 0.seconds) {
240 			got_events = m_loop.doProcessEvents(0.seconds);
241 			m_timers.process(MonoTime.currTime);
242 		} else {
243 			auto now = MonoTime.currTime;
244 			do {
245 				auto nextto = max(min(m_timers.getNextTimeout(now), timeout), 0.seconds);
246 				got_events = m_loop.doProcessEvents(nextto);
247 				auto prev_step = now;
248 				now = MonoTime.currTime;
249 				got_events |= m_timers.process(now);
250 				if (timeout != Duration.max)
251 					timeout -= now - prev_step;
252 			} while (timeout > 0.seconds && !m_exit && !got_events);
253 		}
254 
255 		executeThreadCallbacks();
256 
257 		if (m_exit) {
258 			m_exit = false;
259 			return ExitReason.exited;
260 		}
261 		if (!waiterCount) {
262 			return ExitReason.outOfWaiters;
263 		}
264 		if (got_events) return ExitReason.idle;
265 		return ExitReason.timeout;
266 	}
267 
268 	final override void exit()
269 	{
270 		m_exit = true; // FIXME: this needs to be synchronized!
271 		() @trusted { (cast(shared)m_events).trigger(m_wakeupEvent, true); } ();
272 	}
273 
274 	final override void clearExitFlag()
275 	{
276 		m_exit = false;
277 	}
278 
279 	final override void runInOwnerThread(ThreadCallbackGen del,
280 		ref ThreadCallbackGenParams params)
281 	shared {
282 		auto m = atomicLoad(m_threadCallbackMutex);
283 		auto evt = atomicLoad(m_wakeupEvent);
284 		// NOTE: This case must be handled gracefully to avoid hazardous
285 		//       race-conditions upon unexpected thread termination. The mutex
286 		//       and the map will stay valid even after the driver has been
287 		//       disposed, so no further synchronization is required.
288 		if (!m) return;
289 
290 		try {
291 			synchronized (m)
292 				() @trusted { return (cast()this).m_threadCallbacks; } ()
293 					.put(ThreadCallbackEntry(del, params));
294 		} catch (Exception e) assert(false, e.msg);
295 
296 		m_events.trigger(m_wakeupEvent, false);
297 	}
298 
299 	alias runInOwnerThread = EventDriverCore.runInOwnerThread;
300 
301 
302 	final protected override void* rawUserData(StreamSocketFD descriptor, size_t size, DataInitializer initialize, DataInitializer destroy)
303 	@system {
304 		return rawUserDataImpl(descriptor, size, initialize, destroy);
305 	}
306 
307 	final protected override void* rawUserData(DatagramSocketFD descriptor, size_t size, DataInitializer initialize, DataInitializer destroy)
308 	@system {
309 		return rawUserDataImpl(descriptor, size, initialize, destroy);
310 	}
311 
312 	protected final void* rawUserDataImpl(FD descriptor, size_t size, DataInitializer initialize, DataInitializer destroy)
313 	@system {
314 		return m_loop.rawUserDataImpl(descriptor, size, initialize, destroy);
315 	}
316 
317 	private void executeThreadCallbacks()
318 	{
319 		import std.stdint : intptr_t;
320 
321 		while (true) {
322 			ThreadCallbackEntry del;
323 			try {
324 				synchronized (m_threadCallbackMutex) {
325 					if (m_threadCallbacks.empty) break;
326 					del = m_threadCallbacks.consumeOne;
327 				}
328 			} catch (Exception e) assert(false, e.msg);
329 			del[0](del[1]);
330 		}
331 	}
332 }
333 
334 
335 package class PosixEventLoop {
336 @safe: nothrow:
337 	import core.time : Duration;
338 
339 	package {
340 		AlgebraicChoppedVector!(FDSlot, StreamSocketSlot, StreamListenSocketSlot, DgramSocketSlot, DNSSlot, WatcherSlot, EventSlot, SignalSlot, PipeSlot) m_fds;
341 		size_t m_handleCount = 0;
342 		size_t m_waiterCount = 0;
343 	}
344 
345 	protected @property int maxFD() const { return cast(int)m_fds.length; }
346 
347 	protected void dispose() @nogc { destroy(m_fds); }
348 
349 	/** Waits for and processes a single batch of events.
350 
351 		Returns:
352 			Returns `false` if no event was received before the timeout expired
353 			and `true` if either an event was received, or if the wait was
354 			interrupted by an error or signal.
355 	*/
356 	protected abstract bool doProcessEvents(Duration dur);
357 
358 	/// Registers the FD for general notification reception.
359 	protected abstract void registerFD(FD fd, EventMask mask, bool edge_triggered = true) @nogc;
360 	/// Unregisters the FD for general notification reception.
361 	protected abstract void unregisterFD(FD fd, EventMask mask) @nogc;
362 	/// Updates the event mask to use for listening for notifications.
363 	protected abstract void updateFD(FD fd, EventMask old_mask, EventMask new_mask, bool edge_triggered = true) @nogc;
364 
365 	final protected void notify(EventType evt)(size_t fd)
366 	{
367 		//assert(m_fds[fd].callback[evt] !is null, "Notifying FD which is not listening for event.");
368 		if (m_fds[fd].common.callback[evt]) {
369 			auto vc = m_fds[fd].common.validationCounter;
370 			m_fds[fd].common.callback[evt](FD(fd, vc));
371 		}
372 	}
373 
374 	final protected void enumerateFDs(EventType evt)(scope FDEnumerateCallback del)
375 	{
376 		// TODO: optimize!
377 		foreach (i; 0 .. cast(int)m_fds.length)
378 			if (m_fds[i].common.callback[evt])
379 				del(FD(i, m_fds[i].common.validationCounter));
380 	}
381 
382 	package void setNotifyCallback(EventType evt)(FD fd, FDSlotCallback callback)
383 	{
384 		assert(m_fds[fd.value].common.refCount > 0,
385 			"Setting notification callback on unreferenced file descriptor slot.");
386 		assert((callback !is null) != (m_fds[fd.value].common.callback[evt] !is null),
387 			"Overwriting notification callback.");
388 		// ensure that the FD doesn't get closed before the callback gets called.
389 		with (m_fds[fd.value]) {
390 			if (callback !is null) {
391 				if (!(common.flags & FDFlags.internal)) m_waiterCount++;
392 				common.refCount++;
393 			} else {
394 				common.refCount--;
395 				if (!(common.flags & FDFlags.internal)) m_waiterCount--;
396 			}
397 			common.callback[evt] = callback;
398 		}
399 	}
400 
401 	package FDType initFD(FDType, T)(size_t fd, FDFlags flags, auto ref T slot_init)
402 	{
403 		uint vc;
404 
405 		with (m_fds[fd]) {
406 			assert(common.refCount == 0, "Initializing referenced file descriptor slot.");
407 			assert(specific.kind == typeof(specific).Kind.none, "Initializing slot that has not been cleared.");
408 			common.refCount = 1;
409 			common.flags = flags;
410 			debug (EventCoreLeakTrace)
411 			{
412 				import core.runtime : defaultTraceHandler;
413 				common.origin = defaultTraceHandler(null);
414 			}
415 			specific = slot_init;
416 			vc = common.validationCounter;
417 		}
418 
419 		if (!(flags & FDFlags.internal))
420 			m_handleCount++;
421 
422 		return FDType(fd, vc);
423 	}
424 
425 	package void clearFD(T)(FD fd)
426 	{
427 		import taggedalgebraic : hasType;
428 
429 		auto slot = () @trusted { return &m_fds[fd.value]; } ();
430 		assert(slot.common.validationCounter == fd.validationCounter, "Clearing FD slot for invalid FD");
431 		assert(slot.specific.hasType!T, "Clearing file descriptor slot with unmatched type.");
432 
433 		if (!(slot.common.flags & FDFlags.internal))
434 			m_handleCount--;
435 
436 		if (slot.common.userDataDestructor)
437 			() @trusted { slot.common.userDataDestructor(slot.common.userData.ptr); } ();
438 		if (!(slot.common.flags & FDFlags.internal))
439 			foreach (cb; slot.common.callback)
440 				if (cb !is null)
441 					m_waiterCount--;
442 
443 		auto vc = slot.common.validationCounter;
444 		*slot = m_fds.FullField.init;
445 		slot.common.validationCounter = vc + 1;
446 	}
447 
448 	package final void* rawUserDataImpl(size_t descriptor, size_t size, DataInitializer initialize, DataInitializer destroy)
449 	@system @nogc {
450 		FDSlot* fds = &m_fds[descriptor].common;
451 		assert(fds.userDataDestructor is null || fds.userDataDestructor is destroy,
452 			"Requesting user data with differing type (destructor).");
453 		assert(size <= FDSlot.userData.length, "Requested user data is too large.");
454 		if (size > FDSlot.userData.length) assert(false);
455 		if (!fds.userDataDestructor) {
456 			initialize(fds.userData.ptr);
457 			fds.userDataDestructor = destroy;
458 		}
459 		return fds.userData.ptr;
460 	}
461 }
462 
463 
464 alias FDEnumerateCallback = void delegate(FD);
465 
466 alias FDSlotCallback = void delegate(FD);
467 
468 private struct FDSlot {
469 	FDSlotCallback[EventType.max+1] callback;
470 	uint refCount;
471 	uint validationCounter;
472 	FDFlags flags;
473 
474 	DataInitializer userDataDestructor;
475 	ubyte[16*size_t.sizeof] userData;
476 	debug (EventCoreLeakTrace)
477 		Throwable.TraceInfo origin;
478 
479 	@property EventMask eventMask() const nothrow {
480 		EventMask ret = cast(EventMask)0;
481 		if (callback[EventType.read] !is null) ret |= EventMask.read;
482 		if (callback[EventType.write] !is null) ret |= EventMask.write;
483 		if (callback[EventType.status] !is null) ret |= EventMask.status;
484 		return ret;
485 	}
486 }
487 
488 enum FDFlags {
489 	none = 0,
490 	internal = 1<<0,
491 }
492 
493 enum EventType {
494 	read,
495 	write,
496 	status
497 }
498 
499 enum EventMask {
500 	read = 1<<0,
501 	write = 1<<1,
502 	status = 1<<2
503 }
504 
505 void log(ARGS...)(string fmt, ARGS args)
506 @trusted {
507 	import std.stdio : writef, writefln;
508 	import core.thread : Thread;
509 	try {
510 		writef("[%s]: ", Thread.getThis().name);
511 		writefln(fmt, args);
512 	} catch (Exception) {}
513 }
514 
515 
516 /*version (Windows) {
517 	import std.c.windows.windows;
518 	import std.c.windows.winsock;
519 
520 	alias EWOULDBLOCK = WSAEWOULDBLOCK;
521 
522 	extern(System) DWORD FormatMessageW(DWORD dwFlags, const(void)* lpSource, DWORD dwMessageId, DWORD dwLanguageId, LPWSTR lpBuffer, DWORD nSize, void* Arguments);
523 
524 	class WSAErrorException : Exception {
525 		int error;
526 
527 		this(string message, string file = __FILE__, size_t line = __LINE__)
528 		{
529 			error = WSAGetLastError();
530 			this(message, error, file, line);
531 		}
532 
533 		this(string message, int error, string file = __FILE__, size_t line = __LINE__)
534 		{
535 			import std.string : format;
536 			ushort* errmsg;
537 			FormatMessageW(FORMAT_MESSAGE_ALLOCATE_BUFFER|FORMAT_MESSAGE_FROM_SYSTEM|FORMAT_MESSAGE_IGNORE_INSERTS,
538 						   null, error, MAKELANGID(LANG_NEUTRAL, SUBLANG_DEFAULT), cast(LPWSTR)&errmsg, 0, null);
539 			size_t len = 0;
540 			while (errmsg[len]) len++;
541 			auto errmsgd = (cast(wchar[])errmsg[0 .. len]).idup;
542 			LocalFree(errmsg);
543 			super(format("%s: %s (%s)", message, errmsgd, error), file, line);
544 		}
545 	}
546 
547 	alias SystemSocketException = WSAErrorException;
548 } else {
549 	import std.exception : ErrnoException;
550 	alias SystemSocketException = ErrnoException;
551 }
552 
553 T socketEnforce(T)(T value, lazy string msg = null, string file = __FILE__, size_t line = __LINE__)
554 {
555 	import std.exception : enforceEx;
556 	return enforceEx!SystemSocketException(value, msg, file, line);
557 }*/