1 /**
2 	BSD kqueue based event driver implementation.
3 
4 	Kqueue is an efficient API for asynchronous I/O on BSD flavors, including
5 	OS X/macOS, suitable for large numbers of concurrently open sockets.
6 */
7 module eventcore.drivers.posix.kqueue;
8 @safe: /*@nogc:*/ nothrow:
9 
10 version (FreeBSD) enum have_kqueue = true;
11 else version (DragonFlyBSD) enum have_kqueue = true;
12 else version (OSX) enum have_kqueue = true;
13 else enum have_kqueue = false;
14 
15 static if (have_kqueue):
16 
17 public import eventcore.drivers.posix.driver;
18 import eventcore.internal.utils;
19 
20 import core.time : Duration;
21 import core.sys.posix.sys.time : timespec, time_t;
22 
23 version (OSX) import core.sys.darwin.sys.event;
24 else version (FreeBSD) import core.sys.freebsd.sys.event;
25 else version (DragonFlyBSD) import core.sys.dragonflybsd.sys.event;
26 else static assert(false, "Kqueue not supported on this OS.");
27 
28 
29 import core.sys.linux.epoll;
30 
31 
32 alias KqueueEventDriver = PosixEventDriver!KqueueEventLoop;
33 
34 final class KqueueEventLoop : KqueueEventLoopBase {
35 	override bool doProcessEvents(Duration timeout)
36 	@trusted {
37 		return doProcessEventsBase(timeout);
38 	}
39 }
40 
41 
42 abstract class KqueueEventLoopBase : PosixEventLoop {
43 	protected {
44 		int m_queue;
45 		size_t m_changeCount = 0;
46 		kevent_t[100] m_changes;
47 		kevent_t[100] m_events;
48 	}
49 
50 	this()
51 	@safe nothrow @nogc {
52 		m_queue = () @trusted { return kqueue(); } ();
53 		assert(m_queue >= 0, "Failed to create kqueue.");
54 	}
55 
56 	protected bool doProcessEventsBase(Duration timeout)
57 	@trusted nothrow {
58 		import std.algorithm : min;
59 		//assert(Fiber.getThis() is null, "processEvents may not be called from within a fiber!");
60 
61 		//print("wait %s", m_events.length);
62 		timespec ts;
63 		long secs, hnsecs;
64 		timeout.split!("seconds", "hnsecs")(secs, hnsecs);
65 		ts.tv_sec = cast(time_t)secs;
66 		ts.tv_nsec = cast(uint)hnsecs * 100;
67 
68 		auto ret = kevent(m_queue, m_changes.ptr, cast(int)m_changeCount, m_events.ptr, cast(int)m_events.length, timeout == Duration.max ? null : &ts);
69 		m_changeCount = 0;
70 
71 		//print("kevent returned %s", ret);
72 
73 		if (ret > 0) {
74 			foreach (ref evt; m_events[0 .. ret]) {
75 				//print("event %s %s", evt.ident, evt.filter, evt.flags);
76 				assert(evt.ident <= uint.max);
77 				auto fd = cast(size_t)evt.ident;
78 				if (evt.flags & (EV_EOF|EV_ERROR))
79 					notify!(EventType.status)(fd);
80 				switch (evt.filter) {
81 					default: break;
82 					case EVFILT_READ: notify!(EventType.read)(fd); break;
83 					case EVFILT_WRITE: notify!(EventType.write)(fd); break;
84 				}
85 				// EV_SIGNAL, EV_TIMEOUT
86 			}
87 			return true;
88 		} else {
89 			// NOTE: In particular, EINTR needs to cause true to be returned
90 			//       here, so that user code has a chance to handle any effects
91 			//       of the signal handler before waiting again.
92 			//
93 			//       Other errors are very likely to to reoccur for the next
94 			//       loop iteration, so there is no value in attempting to
95 			//       wait again.
96 			return ret < 0;
97 		}
98 	}
99 
100 	override void dispose()
101 	{
102 		super.dispose();
103 		import core.sys.posix.unistd : close;
104 		close(m_queue);
105 	}
106 
107 	override void registerFD(FD fd, EventMask mask, bool edge_triggered = true)
108 	{
109 		//print("register %s %s", fd, mask);
110 		kevent_t ev;
111 		ev.ident = fd;
112 		ev.flags = EV_ADD|EV_ENABLE;
113 		if (edge_triggered) ev.flags |= EV_CLEAR;
114 		if (mask & EventMask.read) {
115 			ev.filter = EVFILT_READ;
116 			putChange(ev);
117 		}
118 		if (mask & EventMask.write) {
119 			ev.filter = EVFILT_WRITE;
120 			putChange(ev);
121 		}
122 		//if (mask & EventMask.status) ev.events |= EPOLLERR|EPOLLRDHUP;
123 	}
124 
125 	override void unregisterFD(FD fd, EventMask mask)
126 	{
127 		kevent_t ev;
128 		ev.ident = fd;
129 		ev.flags = EV_DELETE;
130 		putChange(ev);
131 	}
132 
133 	override void updateFD(FD fd, EventMask old_mask, EventMask new_mask, bool edge_triggered = true)
134 	{
135 		//print("update %s %s", fd, mask);
136 		kevent_t ev;
137 		auto changes = old_mask ^ new_mask;
138 
139 		if (changes & EventMask.read) {
140 			ev.filter = EVFILT_READ;
141 			ev.flags = new_mask & EventMask.read ? EV_ADD : EV_DELETE;
142 			if (edge_triggered) ev.flags |= EV_CLEAR;
143 			putChange(ev);
144 		}
145 
146 		if (changes & EventMask.write) {
147 			ev.filter = EVFILT_WRITE;
148 			ev.flags = new_mask & EventMask.write ? EV_ADD : EV_DELETE;
149 			if (edge_triggered) ev.flags |= EV_CLEAR;
150 			putChange(ev);
151 		}
152 
153 		//if (mask & EventMask.status) ev.events |= EPOLLERR|EPOLLRDHUP;
154 	}
155 
156 	private void putChange(ref kevent_t ev)
157 	@safe nothrow @nogc {
158 		m_changes[m_changeCount++] = ev;
159 		if (m_changeCount == m_changes.length) {
160 			auto ret = (() @trusted => kevent(m_queue, &m_changes[0], cast(int)m_changes.length, null, 0, null)) ();
161 			assert(ret == 0);
162 			m_changeCount = 0;
163 		}
164 	}
165 }