1 module eventcore.drivers.posix.processes;
2 @safe:
3 
4 import eventcore.driver;
5 import eventcore.drivers.posix.driver;
6 import eventcore.drivers.posix.signals;
7 import eventcore.internal.utils : nogc_assert, print;
8 
9 import std.algorithm.comparison : among;
10 import std.variant : visit;
11 import std.stdint;
12 
13 
14 
15 private enum SIGCHLD = 17;
16 
17 final class PosixEventDriverProcesses(Loop : PosixEventLoop) : EventDriverProcesses {
18 @safe: /*@nogc:*/ nothrow:
19 	import core.sync.mutex : Mutex;
20 	import core.sys.posix.unistd : dup;
21 	import core.thread : Thread;
22 
23 	private {
24 		static shared Mutex s_mutex;
25 		static __gshared ProcessInfo[int] s_processes;
26 		static __gshared Thread s_waitThread;
27 
28 		Loop m_loop;
29 		// FIXME: avoid virtual funciton calls and use the final type instead
30 		EventDriver m_driver;
31 		uint m_validationCounter;
32 	}
33 
34 	this(Loop loop, EventDriver driver)
35 	{
36 		m_loop = loop;
37 		m_driver = driver;
38 	}
39 
40 	void dispose()
41 	{
42 	}
43 
44 	final override ProcessID adopt(int system_pid)
45 	{
46 		ProcessInfo info;
47 		info.exited = false;
48 		info.refCount = 1;
49 		info.validationCounter = ++m_validationCounter;
50 		info.driver = this;
51 
52 		auto pid = ProcessID(system_pid, info.validationCounter);
53 		add(system_pid, info);
54 		return pid;
55 	}
56 
57 	final override Process spawn(
58 		string[] args,
59 		ProcessStdinFile stdin,
60 		ProcessStdoutFile stdout,
61 		ProcessStderrFile stderr,
62 		const string[string] env,
63 		ProcessConfig config,
64 		string working_dir)
65 	@trusted {
66 		// Use std.process to spawn processes
67 		import std.process : pipe, Pid, spawnProcess, StdProcessConfig = Config;
68 		import std.stdio : File;
69 		static import std.stdio;
70 
71 		static File fdToFile(int fd, scope const(char)[] mode)
72 		{
73 			try {
74 				File f;
75 				f.fdopen(fd, mode);
76 				return f;
77 			} catch (Exception e) {
78 				assert(0);
79 			}
80 		}
81 
82 		try {
83 			Process process;
84 			File stdinFile, stdoutFile, stderrFile;
85 
86 			stdinFile = stdin.visit!(
87 				(int handle) => fdToFile(handle, "r"),
88 				(ProcessRedirect redirect) {
89 					final switch (redirect) {
90 					case ProcessRedirect.inherit: return std.stdio.stdin;
91 					case ProcessRedirect.none: return File.init;
92 					case ProcessRedirect.pipe:
93 						auto p = pipe();
94 						process.stdin = m_driver.pipes.adopt(dup(p.writeEnd.fileno));
95 						return p.readEnd;
96 					}
97 				});
98 
99 			stdoutFile = stdout.visit!(
100 				(int handle) => fdToFile(handle, "w"),
101 				(ProcessRedirect redirect) {
102 					final switch (redirect) {
103 					case ProcessRedirect.inherit: return std.stdio.stdout;
104 					case ProcessRedirect.none: return File.init;
105 					case ProcessRedirect.pipe:
106 						auto p = pipe();
107 						process.stdout = m_driver.pipes.adopt(dup(p.readEnd.fileno));
108 						return p.writeEnd;
109 					}
110 				},
111 				(_) => File.init);
112 
113 			stderrFile = stderr.visit!(
114 				(int handle) => fdToFile(handle, "w"),
115 				(ProcessRedirect redirect) {
116 					final switch (redirect) {
117 					case ProcessRedirect.inherit: return std.stdio.stderr;
118 					case ProcessRedirect.none: return File.init;
119 					case ProcessRedirect.pipe:
120 						auto p = pipe();
121 						process.stderr = m_driver.pipes.adopt(dup(p.readEnd.fileno));
122 						return p.writeEnd;
123 					}
124 				},
125 				(_) => File.init);
126 
127 			const redirectStdout = stdout.convertsTo!ProcessStdoutRedirect;
128 			const redirectStderr = stderr.convertsTo!ProcessStderrRedirect;
129 
130 			if (redirectStdout) {
131 				assert(!redirectStderr, "Can't redirect both stdout and stderr");
132 
133 				stdoutFile = stderrFile;
134 			} else if (redirectStderr) {
135 				stderrFile = stdoutFile;
136 			}
137 
138 			Pid stdPid = spawnProcess(
139 				args,
140 				stdinFile,
141 				stdoutFile,
142 				stderrFile,
143 				env,
144 				cast(StdProcessConfig)config,
145 				working_dir);
146 			process.pid = adopt(stdPid.osHandle);
147 			stdPid.destroy();
148 
149 			return process;
150 		} catch (Exception e) {
151 			return Process.init;
152 		}
153 	}
154 
155 	final override void kill(ProcessID pid, int signal)
156 	@trusted {
157 		import core.sys.posix.signal : pkill = kill;
158 
159 		if (!isValid(pid)) return;
160 
161 		if (cast(int)pid > 0)
162 			pkill(cast(int)pid, signal);
163 	}
164 
165 	final override size_t wait(ProcessID pid, ProcessWaitCallback on_process_exit)
166 	{
167 		bool exited;
168 		int exitCode;
169 
170 		size_t id = size_t.max;
171 		lockedProcessInfo(pid, (info) {
172 			if (!info) return;
173 
174 			if (info.exited) {
175 				exited = true;
176 				exitCode = info.exitCode;
177 			} else {
178 				info.callbacks ~= on_process_exit;
179 				id = info.callbacks.length - 1;
180 			}
181 		});
182 
183 		if (exited) {
184 			on_process_exit(pid, exitCode);
185 		}
186 
187 		return id;
188 	}
189 
190 	final override void cancelWait(ProcessID pid, size_t wait_id)
191 	{
192 		if (wait_id == size_t.max) return;
193 
194 		lockedProcessInfo(pid, (info) {
195 			if (!info) return;
196 
197 			assert(!info.exited, "Cannot cancel wait when none are pending");
198 			assert(info.callbacks.length > wait_id, "Invalid process wait ID");
199 
200 			info.callbacks[wait_id] = null;
201 		});
202 	}
203 
204 	private void onProcessExit(int system_pid)
205 	shared {
206 		m_driver.core.runInOwnerThread(&onLocalProcessExit, system_pid);
207 	}
208 
209 	private static void onLocalProcessExit(int system_pid)
210 	{
211 		int exitCode;
212 		ProcessWaitCallback[] callbacks;
213 
214 		ProcessID pid;
215 
216 		PosixEventDriverProcesses driver;
217 		lockedProcessInfoPlain(system_pid, (info) {
218 			assert(info !is null);
219 
220 			exitCode = info.exitCode;
221 			callbacks = info.callbacks;
222 			pid = ProcessID(system_pid, info.validationCounter);
223 			info.callbacks = null;
224 
225 			driver = info.driver;
226 		});
227 
228 		foreach (cb; callbacks) {
229 			if (cb)
230 				cb(pid, exitCode);
231 		}
232 
233 		driver.releaseRef(pid);
234 	}
235 
236 	final override bool hasExited(ProcessID pid)
237 	{
238 		bool ret;
239 		lockedProcessInfo(pid, (info) {
240 			if (info) ret = info.exited;
241 			else ret = true;
242 		});
243 		return ret;
244 	}
245 
246 	override bool isValid(ProcessID handle)
247 	const {
248 		s_mutex.lock_nothrow();
249 		scope (exit) s_mutex.unlock_nothrow();
250 		auto info = () @trusted { return cast(int)handle.value in s_processes; } ();
251 		return info && info.validationCounter == handle.validationCounter;
252 	}
253 
254 	final override void addRef(ProcessID pid)
255 	{
256 		lockedProcessInfo(pid, (info) {
257 			if (!info) return;
258 
259 			nogc_assert(info.refCount > 0, "Adding reference to unreferenced process FD.");
260 			info.refCount++;
261 		});
262 	}
263 
264 	final override bool releaseRef(ProcessID pid)
265 	{
266 		bool ret;
267 		lockedProcessInfo(pid, (info) {
268 			if (!info) {
269 				ret = true;
270 				return;
271 			}
272 
273 			nogc_assert(info.refCount > 0, "Releasing reference to unreferenced process FD.");
274 			if (--info.refCount == 0) {
275 				// Remove/deallocate process
276 				if (info.userDataDestructor)
277 					() @trusted { info.userDataDestructor(info.userData.ptr); } ();
278 
279 				() @trusted { s_processes.remove(cast(int)pid.value); } ();
280 				ret = false;
281 			} else ret = true;
282 		});
283 		return ret;
284 	}
285 
286 	final protected override void* rawUserData(ProcessID pid, size_t size, DataInitializer initialize, DataInitializer destroy)
287 	@system {
288 		void* ret;
289 		lockedProcessInfo(pid, (info) @safe nothrow {
290 			assert(info.userDataDestructor is null || info.userDataDestructor is destroy,
291 				"Requesting user data with differing type (destructor).");
292 			assert(size <= ProcessInfo.userData.length, "Requested user data is too large.");
293 
294 			if (!info.userDataDestructor) {
295 				() @trusted { initialize(info.userData.ptr); } ();
296 				info.userDataDestructor = destroy;
297 			}
298 			ret = () @trusted { return info.userData.ptr; } ();
299 		});
300 		return ret;
301 	}
302 
303 	package final @property size_t pendingCount() const nothrow @trusted { return s_processes.length; }
304 
305 
306 	shared static this()
307 	{
308 		s_mutex = new shared Mutex;
309 	}
310 
311 	private static void lockedProcessInfoPlain(int pid, scope void delegate(ProcessInfo*) nothrow @safe fn)
312 	{
313 		s_mutex.lock_nothrow();
314 		scope (exit) s_mutex.unlock_nothrow();
315 		auto info = () @trusted { return pid in s_processes; } ();
316 		fn(info);
317 	}
318 
319 	private static void lockedProcessInfo(ProcessID pid, scope void delegate(ProcessInfo*) nothrow @safe fn)
320 	{
321 		lockedProcessInfoPlain(cast(int)pid.value, (pi) {
322 			fn(pi.validationCounter == pid.validationCounter ? pi : null);
323 		});
324 	}
325 
326 	private static void add(int pid, ProcessInfo info) @trusted {
327 		s_mutex.lock_nothrow();
328 		scope (exit) s_mutex.unlock_nothrow();
329 
330 		if (!s_waitThread) {
331 			s_waitThread = new Thread(&waitForProcesses);
332 			s_waitThread.start();
333 		}
334 
335 		assert(pid !in s_processes, "Process adopted twice");
336 		s_processes[pid] = info;
337 	}
338 
339 	private static void waitForProcesses()
340 	@system {
341 		import core.sys.posix.sys.wait : idtype_t, WNOHANG, WNOWAIT, WEXITED, WEXITSTATUS, WIFEXITED, WTERMSIG, waitid, waitpid;
342 		import core.sys.posix.signal : siginfo_t;
343 
344 		while (true) {
345 			siginfo_t dummy;
346 
347 			version (Android) {
348 				// P_ALL is defined as 0 on android
349 				auto ret = waitid(0, -1, &dummy, WEXITED|WNOWAIT);
350 			} else {
351 				auto ret = waitid(idtype_t.P_ALL, -1, &dummy, WEXITED|WNOWAIT);
352 			}
353 
354 			if (ret == -1) {
355 				{
356 					s_mutex.lock_nothrow();
357 					scope (exit) s_mutex.unlock_nothrow();
358 					s_waitThread = null;
359 				}
360 				break;
361 			}
362 
363 			int[] allprocs;
364 
365 			{
366 				s_mutex.lock_nothrow();
367 				scope (exit) s_mutex.unlock_nothrow();
368 
369 
370 				() @trusted {
371 					foreach (ref entry; s_processes.byKeyValue) {
372 						if (!entry.value.exited)
373 							allprocs ~= entry.key;
374 					}
375 				} ();
376 			}
377 
378 			foreach (pid; allprocs) {
379 				int status;
380 				ret = () @trusted { return waitpid(pid, &status, WNOHANG); } ();
381 				if (ret == pid) {
382 					int exitstatus = WIFEXITED(status) ? WEXITSTATUS(status) : -WTERMSIG(status);
383 					onProcessExitStatic(ret, exitstatus);
384 				}
385 			}
386 		}
387 	}
388 
389 	private static void onProcessExitStatic(int system_pid, int exit_status)
390 	{
391 		PosixEventDriverProcesses driver;
392 		lockedProcessInfoPlain(system_pid, (ProcessInfo* info) @safe {
393 			// We get notified of any child exiting, so ignore the ones we're
394 			// not aware of
395 			if (info is null) return;
396 
397 			// Increment the ref count to make sure it doesn't get removed
398 			info.refCount++;
399 			info.exited = true;
400 			info.exitCode = exit_status;
401 			driver = info.driver;
402 		});
403 
404 		if (driver)
405 			() @trusted { return cast(shared)driver; } ().onProcessExit(system_pid);
406 	}
407 
408 	private static struct ProcessInfo {
409 		bool exited = true;
410 		int exitCode;
411 		ProcessWaitCallback[] callbacks;
412 		size_t refCount = 0;
413 		uint validationCounter;
414 		PosixEventDriverProcesses driver;
415 
416 		DataInitializer userDataDestructor;
417 		ubyte[16*size_t.sizeof] userData;
418 	}
419 }
420 
421 final class DummyEventDriverProcesses(Loop : PosixEventLoop) : EventDriverProcesses {
422 @safe: /*@nogc:*/ nothrow:
423 
424 	this(Loop loop, EventDriver driver) {}
425 
426 	void dispose() {}
427 
428 	override ProcessID adopt(int system_pid)
429 	{
430 		assert(false, "TODO!");
431 	}
432 
433 	override Process spawn(string[] args, ProcessStdinFile stdin, ProcessStdoutFile stdout, ProcessStderrFile stderr, const string[string] env, ProcessConfig config, string working_dir)
434 	{
435 		assert(false, "TODO!");
436 	}
437 
438 	override bool hasExited(ProcessID pid)
439 	{
440 		assert(false, "TODO!");
441 	}
442 
443 	override void kill(ProcessID pid, int signal)
444 	{
445 		assert(false, "TODO!");
446 	}
447 
448 	override size_t wait(ProcessID pid, ProcessWaitCallback on_process_exit)
449 	{
450 		assert(false, "TODO!");
451 	}
452 
453 	override void cancelWait(ProcessID pid, size_t waitId)
454 	{
455 		assert(false, "TODO!");
456 	}
457 
458 	override bool isValid(ProcessID handle)
459 	const {
460 		return false;
461 	}
462 
463 	override void addRef(ProcessID pid)
464 	{
465 		assert(false, "TODO!");
466 	}
467 
468 	override bool releaseRef(ProcessID pid)
469 	{
470 		assert(false, "TODO!");
471 	}
472 
473 	protected override void* rawUserData(ProcessID descriptor, size_t size, DataInitializer initialize, DataInitializer destroy)
474 	@system {
475 		assert(false, "TODO!");
476 	}
477 
478 	package final @property size_t pendingCount() const nothrow { return 0; }
479 }