1 module eventcore.drivers.posix.pipes;
2 @safe:
3 
4 import eventcore.driver;
5 import eventcore.drivers.posix.driver;
6 import eventcore.internal.utils : nogc_assert, print;
7 
8 import std.algorithm : min, max;
9 
10 
11 final class PosixEventDriverPipes(Loop : PosixEventLoop) : EventDriverPipes {
12 @safe: /*@nogc:*/ nothrow:
13 	import core.stdc.errno : errno, EAGAIN;
14 	import core.sys.posix.unistd : close, read, write;
15 	import core.sys.posix.fcntl;
16 	import core.sys.posix.poll;
17 
18 	private Loop m_loop;
19 
20 	this(Loop loop)
21 	@nogc {
22 		m_loop = loop;
23 	}
24 
25 	final override PipeFD adopt(int system_fd)
26 	{
27 		if (m_loop.m_fds[system_fd].common.refCount) // FD already in use?
28 			return PipeFD.invalid;
29 
30         // Suprisingly cannot use O_CLOEXEC here, so use FD_CLOEXEC instead.
31         () @trusted { fcntl(system_fd, F_SETFL, fcntl(system_fd, F_GETFL) | O_NONBLOCK | FD_CLOEXEC); } ();
32 
33 		auto fd = m_loop.initFD!PipeFD(system_fd, FDFlags.none, PipeSlot.init);
34 		m_loop.registerFD(fd, EventMask.read|EventMask.write|EventMask.status);
35 		return fd;
36 	}
37 
38 	final override void read(PipeFD pipe, ubyte[] buffer, IOMode mode, PipeIOCallback on_read_finish)
39 	{
40 		if (!isValid(pipe)) {
41 			on_read_finish(pipe, IOStatus.invalidHandle, 0);
42 			return;
43 		}
44 
45 		auto ret = () @trusted { return read(cast(int)pipe, buffer.ptr, min(buffer.length, int.max)); } ();
46 
47 		// Read failed
48 		if (ret < 0) {
49 			auto err = errno;
50 			if (err != EAGAIN) {
51 				print("Pipe error %s!", err);
52 				on_read_finish(pipe, IOStatus.error, 0);
53 				return;
54 			}
55 		}
56 
57 		// EOF
58 		if (ret == 0 && buffer.length > 0) {
59 			on_read_finish(pipe, IOStatus.disconnected, 0);
60 			return;
61 		}
62 
63 		// Handle immediate mode
64 		if (ret < 0 && mode == IOMode.immediate) {
65 			on_read_finish(pipe, IOStatus.wouldBlock, 0);
66 			return;
67 		}
68 
69 		// Handle successful read
70 		if (ret >= 0) {
71 			buffer = buffer[ret .. $];
72 
73 			// Handle completed read
74 			if (mode != IOMode.all || buffer.length == 0) {
75 				on_read_finish(pipe, IOStatus.ok, ret);
76 				return;
77 			}
78 		}
79 
80 		auto slot = () @trusted { return &m_loop.m_fds[pipe].pipe(); } ();
81 		assert(slot.readCallback is null, "Concurrent reads are not allowed.");
82 		slot.readCallback = on_read_finish;
83 		slot.readMode = mode;
84 		slot.bytesRead = max(ret, 0);
85 		slot.readBuffer = buffer;
86 
87 		// Need to use EventType.status as well, as pipes don't otherwise notify
88 		// of closes
89 		m_loop.setNotifyCallback!(EventType.read)(pipe, &onPipeRead);
90 		m_loop.setNotifyCallback!(EventType.status)(pipe, &onPipeRead);
91 	}
92 
93 	private void onPipeRead(FD fd)
94 	{
95 		auto slot = () @trusted { return &m_loop.m_fds[fd].pipe(); } ();
96 		auto pipe = cast(PipeFD)fd;
97 
98 		void finalize(IOStatus status)
99 		{
100 			addRef(pipe);
101 			scope(exit) releaseRef(pipe);
102 
103 			m_loop.setNotifyCallback!(EventType.read)(pipe, null);
104 			m_loop.setNotifyCallback!(EventType.status)(pipe, null);
105 			auto cb = slot.readCallback;
106 			slot.readCallback = null;
107 			slot.readBuffer = null;
108 			cb(pipe, status, slot.bytesRead);
109 		}
110 
111 		ssize_t ret = () @trusted { return read(cast(int)pipe, slot.readBuffer.ptr, min(slot.readBuffer.length, int.max)); } ();
112 
113 		// Read failed
114 		if (ret < 0) {
115 			auto err = errno;
116 			if (err != EAGAIN) {
117 				print("Pipe error %s!", err);
118 				finalize(IOStatus.error);
119 				return;
120 			}
121 		}
122 
123 		// EOF
124 		if (ret == 0 && slot.readBuffer.length > 0) {
125 			finalize(IOStatus.disconnected);
126 			return;
127 		}
128 
129 		// Successful read
130 		if (ret > 0 || !slot.readBuffer.length) {
131 			slot.readBuffer = slot.readBuffer[ret .. $];
132 			slot.bytesRead += ret;
133 
134 			// Handle completed read
135 			if (slot.readMode != IOMode.all || slot.readBuffer.length == 0) {
136 				finalize(IOStatus.ok);
137 				return;
138 			}
139 		}
140 	}
141 
142 	final override void cancelRead(PipeFD pipe)
143 	{
144 		if (!isValid(pipe)) return;
145 
146 		auto slot = () @trusted { return &m_loop.m_fds[pipe].pipe(); } ();
147 		assert(slot.readCallback !is null, "Cancelling read when there is no read in progress.");
148 		m_loop.setNotifyCallback!(EventType.read)(pipe, null);
149 		slot.readBuffer = null;
150 	}
151 
152 	final override void write(PipeFD pipe, const(ubyte)[] buffer, IOMode mode, PipeIOCallback on_write_finish)
153 	{
154 		if (!isValid(pipe)) {
155 			on_write_finish(pipe, IOStatus.invalidHandle, 0);
156 			return;
157 		}
158 
159 		if (buffer.length == 0) {
160 			on_write_finish(pipe, IOStatus.ok, 0);
161 			return;
162 		}
163 
164 		ssize_t ret = () @trusted { return write(cast(int)pipe, buffer.ptr, min(buffer.length, int.max)); } ();
165 
166 		if (ret < 0) {
167 			auto err = errno;
168 			if (err != EAGAIN) {
169 				on_write_finish(pipe, IOStatus.error, 0);
170 				return;
171 			}
172 
173 			if (mode == IOMode.immediate) {
174 				on_write_finish(pipe, IOStatus.wouldBlock, 0);
175 				return;
176 			}
177 		} else {
178 			buffer = buffer[ret .. $];
179 
180 			if (mode != IOMode.all || buffer.length == 0) {
181 				on_write_finish(pipe, IOStatus.ok, ret);
182 				return;
183 			}
184 		}
185 
186 		auto slot = () @trusted { return &m_loop.m_fds[pipe].pipe(); } ();
187 		assert(slot.writeCallback is null, "Concurrent writes not allowed.");
188 		slot.writeCallback = on_write_finish;
189 		slot.writeMode = mode;
190 		slot.bytesWritten = max(ret, 0);
191 		slot.writeBuffer = buffer;
192 
193 		m_loop.setNotifyCallback!(EventType.write)(pipe, &onPipeWrite);
194 	}
195 
196 	private void onPipeWrite(FD fd)
197 	{
198 		auto slot = () @trusted { return &m_loop.m_fds[fd].pipe(); } ();
199 		auto pipe = cast(PipeFD)fd;
200 
201 		void finalize(IOStatus status)
202 		{
203 			addRef(pipe);
204 			scope(exit) releaseRef(pipe);
205 
206 			m_loop.setNotifyCallback!(EventType.write)(pipe, null);
207 			auto cb = slot.writeCallback;
208 			slot.writeCallback = null;
209 			slot.writeBuffer = null;
210 			cb(pipe, status, slot.bytesWritten);
211 		}
212 
213 		ssize_t ret = () @trusted { return write(cast(int)pipe, slot.writeBuffer.ptr, min(slot.writeBuffer.length, int.max)); } ();
214 
215 		if (ret < 0) {
216 			auto err = errno;
217 			if (err != EAGAIN) {
218 				finalize(IOStatus.error);
219 			}
220 		} else {
221 			slot.bytesWritten += ret;
222 			slot.writeBuffer = slot.writeBuffer[ret .. $];
223 
224 			if (slot.writeMode != IOMode.all || slot.writeBuffer.length == 0) {
225 				finalize(IOStatus.ok);
226 			}
227 		}
228 
229 	}
230 
231 	final override void cancelWrite(PipeFD pipe)
232 	{
233 		if (!isValid(pipe)) return;
234 
235 		auto slot = () @trusted { return &m_loop.m_fds[pipe].pipe(); } ();
236 		assert(slot.writeCallback !is null, "Cancelling write when there is no write in progress.");
237 		m_loop.setNotifyCallback!(EventType.write)(pipe, null);
238 		slot.writeCallback = null;
239 		slot.writeBuffer = null;
240 	}
241 
242 	final override void waitForData(PipeFD pipe, PipeIOCallback on_data_available)
243 	{
244 		if (!isValid(pipe)) {
245 			on_data_available(pipe, IOStatus.invalidHandle, 0);
246 			return;
247 		}
248 
249 		if (pollPipe(pipe, on_data_available))
250 		{
251 			return;
252 		}
253 
254 		auto slot = () @trusted { return &m_loop.m_fds[pipe].pipe(); } ();
255 
256 		assert(slot.readCallback is null, "Concurrent reads are not allowed.");
257 		slot.readCallback = on_data_available;
258 		slot.readMode = IOMode.once; // currently meaningless
259 		slot.bytesRead = 0; // currently meaningless
260 		slot.readBuffer = null;
261 		m_loop.setNotifyCallback!(EventType.read)(pipe, &onPipeDataAvailable);
262 	}
263 
264 	private void onPipeDataAvailable(FD fd)
265 	{
266 		auto slot = () @trusted { return &m_loop.m_fds[fd].pipe(); } ();
267 		auto pipe = cast(PipeFD)fd;
268 
269 		auto callback = (PipeFD f, IOStatus s, size_t m) {
270 			addRef(f);
271 			scope(exit) releaseRef(f);
272 
273 			auto cb = slot.readCallback;
274 			slot.readCallback = null;
275 			slot.readBuffer = null;
276 			cb(f, s, m);
277 		};
278 
279 		if (pollPipe(pipe, callback))
280 		{
281 			m_loop.setNotifyCallback!(EventType.read)(pipe, null);
282 		}
283 	}
284 
285 	private bool pollPipe(PipeFD pipe, PipeIOCallback callback)
286 	@trusted {
287 		// Use poll to check if any data is available
288 		pollfd pfd;
289 		pfd.fd = cast(int)pipe;
290 		pfd.events = POLLIN;
291 		int ret = poll(&pfd, 1, 0);
292 
293 		if (ret == -1) {
294 			print("Error polling pipe: %s!", errno);
295 			callback(pipe, IOStatus.error, 0);
296 			return true;
297 		}
298 
299 		if (ret == 1) {
300 			callback(pipe, IOStatus.error, 0);
301 			return true;
302 		}
303 
304 		return false;
305 	}
306 
307 	final override void close(PipeFD pipe, PipeCloseCallback on_closed)
308 	{
309 		if (!isValid(pipe)) {
310 			on_closed(pipe, CloseStatus.invalidHandle);
311 			return;
312 		}
313 
314 		int res = close(cast(int)pipe);
315 		m_loop.unregisterFD(pipe, EventMask.read|EventMask.write|EventMask.status);
316 		m_loop.clearFD!PipeSlot(pipe);
317 
318 		if (on_closed)
319 			on_closed(pipe, res == 0 ? CloseStatus.ok : CloseStatus.ioError);
320 	}
321 
322 	override bool isValid(PipeFD handle)
323 	const {
324 		if (handle.value >= m_loop.m_fds.length) return false;
325 		return m_loop.m_fds[handle.value].common.validationCounter == handle.validationCounter;
326 	}
327 
328 	final override void addRef(PipeFD pipe)
329 	{
330 		if (!isValid(pipe)) return;
331 
332 		auto slot = () @trusted { return &m_loop.m_fds[pipe]; } ();
333 		assert(slot.common.refCount > 0, "Adding reference to unreferenced pipe FD.");
334 		slot.common.refCount++;
335 	}
336 
337 	final override bool releaseRef(PipeFD pipe)
338 	{
339 		import taggedalgebraic : hasType;
340 
341 		if (!isValid(pipe)) return true;
342 
343 		auto slot = () @trusted { return &m_loop.m_fds[pipe]; } ();
344 		nogc_assert(slot.common.refCount > 0, "Releasing reference to unreferenced pipe FD.");
345 
346 		if (--slot.common.refCount == 0) {
347 			close(pipe, null);
348 			return false;
349 		}
350 		return true;
351 	}
352 
353 	final protected override void* rawUserData(PipeFD fd, size_t size, DataInitializer initialize, DataInitializer destroy)
354 	@system {
355 		return m_loop.rawUserDataImpl(fd, size, initialize, destroy);
356 	}
357 }
358 
359 final class DummyEventDriverPipes(Loop : PosixEventLoop) : EventDriverPipes {
360 @safe: /*@nogc:*/ nothrow:
361 	this(Loop loop) {}
362 
363 	override PipeFD adopt(int system_pipe_handle)
364 	{
365 		assert(false, "TODO!");
366 	}
367 
368 	override void read(PipeFD pipe, ubyte[] buffer, IOMode mode, PipeIOCallback on_read_finish)
369 	{
370 		assert(false, "TODO!");
371 	}
372 
373 	override void cancelRead(PipeFD pipe)
374 	{
375 		assert(false, "TODO!");
376 	}
377 
378 	override void write(PipeFD pipe, const(ubyte)[] buffer, IOMode mode, PipeIOCallback on_write_finish)
379 	{
380 		assert(false, "TODO!");
381 	}
382 
383 	override void cancelWrite(PipeFD pipe)
384 	{
385 		assert(false, "TODO!");
386 	}
387 
388 	override void waitForData(PipeFD pipe, PipeIOCallback on_data_available)
389 	{
390 		assert(false, "TODO!");
391 	}
392 
393 	override void close(PipeFD pipe, PipeCloseCallback on_closed)
394 	{
395 		if (!isValid(pipe)) {
396 			on_closed(pipe, CloseStatus.invalidHandle);
397 			return;
398 		}
399 
400 		assert(false, "TODO!");
401 	}
402 
403 	override bool isValid(PipeFD handle)
404 	const {
405 		return false;
406 	}
407 
408 	override void addRef(PipeFD pid)
409 	{
410 		assert(false, "TODO!");
411 	}
412 
413 	override bool releaseRef(PipeFD pid)
414 	{
415 		assert(false, "TODO!");
416 	}
417 
418 	protected override void* rawUserData(PipeFD descriptor, size_t size, DataInitializer initialize, DataInitializer destroy)
419 	@system {
420 		assert(false, "TODO!");
421 	}
422 }
423 
424 
425 package struct PipeSlot {
426 	alias Handle = PipeFD;
427 
428 	size_t bytesRead;
429 	ubyte[] readBuffer;
430 	IOMode readMode;
431 	PipeIOCallback readCallback;
432 
433 	size_t bytesWritten;
434 	const(ubyte)[] writeBuffer;
435 	IOMode writeMode;
436 	PipeIOCallback writeCallback;
437 }