1 module eventcore.drivers.posix.processes; 2 @safe: 3 4 import eventcore.driver; 5 import eventcore.drivers.posix.driver; 6 import eventcore.drivers.posix.signals; 7 import eventcore.internal.utils : nogc_assert, print; 8 9 import std.algorithm.comparison : among; 10 import std.variant : visit; 11 import std.stdint; 12 13 14 15 private enum SIGCHLD = 17; 16 17 final class PosixEventDriverProcesses(Loop : PosixEventLoop) : EventDriverProcesses { 18 @safe: /*@nogc:*/ nothrow: 19 import core.sync.mutex : Mutex; 20 import core.sys.posix.unistd : dup; 21 import core.thread : Thread; 22 23 private { 24 static shared Mutex s_mutex; 25 static __gshared ProcessInfo[int] s_processes; 26 static __gshared Thread s_waitThread; 27 28 Loop m_loop; 29 // FIXME: avoid virtual funciton calls and use the final type instead 30 EventDriver m_driver; 31 uint m_validationCounter; 32 } 33 34 this(Loop loop, EventDriver driver) 35 { 36 m_loop = loop; 37 m_driver = driver; 38 } 39 40 void dispose() 41 { 42 } 43 44 final override ProcessID adopt(int system_pid) 45 { 46 ProcessInfo info; 47 info.exited = false; 48 info.refCount = 1; 49 info.validationCounter = ++m_validationCounter; 50 info.driver = this; 51 52 auto pid = ProcessID(system_pid, info.validationCounter); 53 add(system_pid, info); 54 return pid; 55 } 56 57 final override Process spawn( 58 string[] args, 59 ProcessStdinFile stdin, 60 ProcessStdoutFile stdout, 61 ProcessStderrFile stderr, 62 const string[string] env, 63 ProcessConfig config, 64 string working_dir) 65 @trusted { 66 // Use std.process to spawn processes 67 import std.process : pipe, Pid, spawnProcess, StdProcessConfig = Config; 68 import std.stdio : File; 69 static import std.stdio; 70 71 static File fdToFile(int fd, scope const(char)[] mode) 72 { 73 try { 74 File f; 75 f.fdopen(fd, mode); 76 return f; 77 } catch (Exception e) { 78 assert(0); 79 } 80 } 81 82 try { 83 Process process; 84 File stdinFile, stdoutFile, stderrFile; 85 86 stdinFile = stdin.visit!( 87 (int handle) => fdToFile(handle, "r"), 88 (ProcessRedirect redirect) { 89 final switch (redirect) { 90 case ProcessRedirect.inherit: return std.stdio.stdin; 91 case ProcessRedirect.none: return File.init; 92 case ProcessRedirect.pipe: 93 auto p = pipe(); 94 process.stdin = m_driver.pipes.adopt(dup(p.writeEnd.fileno)); 95 return p.readEnd; 96 } 97 }); 98 99 stdoutFile = stdout.visit!( 100 (int handle) => fdToFile(handle, "w"), 101 (ProcessRedirect redirect) { 102 final switch (redirect) { 103 case ProcessRedirect.inherit: return std.stdio.stdout; 104 case ProcessRedirect.none: return File.init; 105 case ProcessRedirect.pipe: 106 auto p = pipe(); 107 process.stdout = m_driver.pipes.adopt(dup(p.readEnd.fileno)); 108 return p.writeEnd; 109 } 110 }, 111 (_) => File.init); 112 113 stderrFile = stderr.visit!( 114 (int handle) => fdToFile(handle, "w"), 115 (ProcessRedirect redirect) { 116 final switch (redirect) { 117 case ProcessRedirect.inherit: return std.stdio.stderr; 118 case ProcessRedirect.none: return File.init; 119 case ProcessRedirect.pipe: 120 auto p = pipe(); 121 process.stderr = m_driver.pipes.adopt(dup(p.readEnd.fileno)); 122 return p.writeEnd; 123 } 124 }, 125 (_) => File.init); 126 127 const redirectStdout = stdout.convertsTo!ProcessStdoutRedirect; 128 const redirectStderr = stderr.convertsTo!ProcessStderrRedirect; 129 130 if (redirectStdout) { 131 assert(!redirectStderr, "Can't redirect both stdout and stderr"); 132 133 stdoutFile = stderrFile; 134 } else if (redirectStderr) { 135 stderrFile = stdoutFile; 136 } 137 138 Pid stdPid = spawnProcess( 139 args, 140 stdinFile, 141 stdoutFile, 142 stderrFile, 143 env, 144 cast(StdProcessConfig)config, 145 working_dir); 146 process.pid = adopt(stdPid.osHandle); 147 stdPid.destroy(); 148 149 return process; 150 } catch (Exception e) { 151 return Process.init; 152 } 153 } 154 155 final override void kill(ProcessID pid, int signal) 156 @trusted { 157 import core.sys.posix.signal : pkill = kill; 158 159 if (!isValid(pid)) return; 160 161 if (cast(int)pid > 0) 162 pkill(cast(int)pid, signal); 163 } 164 165 final override size_t wait(ProcessID pid, ProcessWaitCallback on_process_exit) 166 { 167 bool exited; 168 int exitCode; 169 170 size_t id = size_t.max; 171 lockedProcessInfo(pid, (info) { 172 if (!info) return; 173 174 if (info.exited) { 175 exited = true; 176 exitCode = info.exitCode; 177 } else { 178 info.callbacks ~= on_process_exit; 179 id = info.callbacks.length - 1; 180 } 181 }); 182 183 if (exited) { 184 on_process_exit(pid, exitCode); 185 } 186 187 return id; 188 } 189 190 final override void cancelWait(ProcessID pid, size_t wait_id) 191 { 192 if (wait_id == size_t.max) return; 193 194 lockedProcessInfo(pid, (info) { 195 if (!info) return; 196 197 assert(!info.exited, "Cannot cancel wait when none are pending"); 198 assert(info.callbacks.length > wait_id, "Invalid process wait ID"); 199 200 info.callbacks[wait_id] = null; 201 }); 202 } 203 204 private void onProcessExit(int system_pid) 205 shared { 206 m_driver.core.runInOwnerThread(&onLocalProcessExit, system_pid); 207 } 208 209 private static void onLocalProcessExit(int system_pid) 210 { 211 int exitCode; 212 ProcessWaitCallback[] callbacks; 213 214 ProcessID pid; 215 216 PosixEventDriverProcesses driver; 217 lockedProcessInfoPlain(system_pid, (info) { 218 assert(info !is null); 219 220 exitCode = info.exitCode; 221 callbacks = info.callbacks; 222 pid = ProcessID(system_pid, info.validationCounter); 223 info.callbacks = null; 224 225 driver = info.driver; 226 }); 227 228 foreach (cb; callbacks) { 229 if (cb) 230 cb(pid, exitCode); 231 } 232 233 driver.releaseRef(pid); 234 } 235 236 final override bool hasExited(ProcessID pid) 237 { 238 bool ret; 239 lockedProcessInfo(pid, (info) { 240 if (info) ret = info.exited; 241 else ret = true; 242 }); 243 return ret; 244 } 245 246 override bool isValid(ProcessID handle) 247 const { 248 s_mutex.lock_nothrow(); 249 scope (exit) s_mutex.unlock_nothrow(); 250 auto info = () @trusted { return cast(int)handle.value in s_processes; } (); 251 return info && info.validationCounter == handle.validationCounter; 252 } 253 254 final override void addRef(ProcessID pid) 255 { 256 lockedProcessInfo(pid, (info) { 257 if (!info) return; 258 259 nogc_assert(info.refCount > 0, "Adding reference to unreferenced process FD."); 260 info.refCount++; 261 }); 262 } 263 264 final override bool releaseRef(ProcessID pid) 265 { 266 bool ret; 267 lockedProcessInfo(pid, (info) { 268 if (!info) { 269 ret = true; 270 return; 271 } 272 273 nogc_assert(info.refCount > 0, "Releasing reference to unreferenced process FD."); 274 if (--info.refCount == 0) { 275 // Remove/deallocate process 276 if (info.userDataDestructor) 277 () @trusted { info.userDataDestructor(info.userData.ptr); } (); 278 279 () @trusted { s_processes.remove(cast(int)pid.value); } (); 280 ret = false; 281 } else ret = true; 282 }); 283 return ret; 284 } 285 286 final protected override void* rawUserData(ProcessID pid, size_t size, DataInitializer initialize, DataInitializer destroy) 287 @system { 288 void* ret; 289 lockedProcessInfo(pid, (info) @safe nothrow { 290 assert(info.userDataDestructor is null || info.userDataDestructor is destroy, 291 "Requesting user data with differing type (destructor)."); 292 assert(size <= ProcessInfo.userData.length, "Requested user data is too large."); 293 294 if (!info.userDataDestructor) { 295 () @trusted { initialize(info.userData.ptr); } (); 296 info.userDataDestructor = destroy; 297 } 298 ret = () @trusted { return info.userData.ptr; } (); 299 }); 300 return ret; 301 } 302 303 package final @property size_t pendingCount() const nothrow @trusted { return s_processes.length; } 304 305 306 shared static this() 307 { 308 s_mutex = new shared Mutex; 309 } 310 311 private static void lockedProcessInfoPlain(int pid, scope void delegate(ProcessInfo*) nothrow @safe fn) 312 { 313 s_mutex.lock_nothrow(); 314 scope (exit) s_mutex.unlock_nothrow(); 315 auto info = () @trusted { return pid in s_processes; } (); 316 fn(info); 317 } 318 319 private static void lockedProcessInfo(ProcessID pid, scope void delegate(ProcessInfo*) nothrow @safe fn) 320 { 321 lockedProcessInfoPlain(cast(int)pid.value, (pi) { 322 fn(pi.validationCounter == pid.validationCounter ? pi : null); 323 }); 324 } 325 326 private static void add(int pid, ProcessInfo info) @trusted { 327 s_mutex.lock_nothrow(); 328 scope (exit) s_mutex.unlock_nothrow(); 329 330 if (!s_waitThread) { 331 s_waitThread = new Thread(&waitForProcesses); 332 s_waitThread.start(); 333 } 334 335 assert(pid !in s_processes, "Process adopted twice"); 336 s_processes[pid] = info; 337 } 338 339 private static void waitForProcesses() 340 @system { 341 import core.sys.posix.sys.wait : idtype_t, WNOHANG, WNOWAIT, WEXITED, WEXITSTATUS, WIFEXITED, WTERMSIG, waitid, waitpid; 342 import core.sys.posix.signal : siginfo_t; 343 344 while (true) { 345 siginfo_t dummy; 346 347 version (Android) { 348 // P_ALL is defined as 0 on android 349 auto ret = waitid(0, -1, &dummy, WEXITED|WNOWAIT); 350 } else { 351 auto ret = waitid(idtype_t.P_ALL, -1, &dummy, WEXITED|WNOWAIT); 352 } 353 354 if (ret == -1) { 355 { 356 s_mutex.lock_nothrow(); 357 scope (exit) s_mutex.unlock_nothrow(); 358 s_waitThread = null; 359 } 360 break; 361 } 362 363 int[] allprocs; 364 365 { 366 s_mutex.lock_nothrow(); 367 scope (exit) s_mutex.unlock_nothrow(); 368 369 370 () @trusted { 371 foreach (ref entry; s_processes.byKeyValue) { 372 if (!entry.value.exited) 373 allprocs ~= entry.key; 374 } 375 } (); 376 } 377 378 foreach (pid; allprocs) { 379 int status; 380 ret = () @trusted { return waitpid(pid, &status, WNOHANG); } (); 381 if (ret == pid) { 382 int exitstatus = WIFEXITED(status) ? WEXITSTATUS(status) : -WTERMSIG(status); 383 onProcessExitStatic(ret, exitstatus); 384 } 385 } 386 } 387 } 388 389 private static void onProcessExitStatic(int system_pid, int exit_status) 390 { 391 PosixEventDriverProcesses driver; 392 lockedProcessInfoPlain(system_pid, (ProcessInfo* info) @safe { 393 // We get notified of any child exiting, so ignore the ones we're 394 // not aware of 395 if (info is null) return; 396 397 // Increment the ref count to make sure it doesn't get removed 398 info.refCount++; 399 info.exited = true; 400 info.exitCode = exit_status; 401 driver = info.driver; 402 }); 403 404 if (driver) 405 () @trusted { return cast(shared)driver; } ().onProcessExit(system_pid); 406 } 407 408 private static struct ProcessInfo { 409 bool exited = true; 410 int exitCode; 411 ProcessWaitCallback[] callbacks; 412 size_t refCount = 0; 413 uint validationCounter; 414 PosixEventDriverProcesses driver; 415 416 DataInitializer userDataDestructor; 417 ubyte[16*size_t.sizeof] userData; 418 } 419 } 420 421 final class DummyEventDriverProcesses(Loop : PosixEventLoop) : EventDriverProcesses { 422 @safe: /*@nogc:*/ nothrow: 423 424 this(Loop loop, EventDriver driver) {} 425 426 void dispose() {} 427 428 override ProcessID adopt(int system_pid) 429 { 430 assert(false, "TODO!"); 431 } 432 433 override Process spawn(string[] args, ProcessStdinFile stdin, ProcessStdoutFile stdout, ProcessStderrFile stderr, const string[string] env, ProcessConfig config, string working_dir) 434 { 435 assert(false, "TODO!"); 436 } 437 438 override bool hasExited(ProcessID pid) 439 { 440 assert(false, "TODO!"); 441 } 442 443 override void kill(ProcessID pid, int signal) 444 { 445 assert(false, "TODO!"); 446 } 447 448 override size_t wait(ProcessID pid, ProcessWaitCallback on_process_exit) 449 { 450 assert(false, "TODO!"); 451 } 452 453 override void cancelWait(ProcessID pid, size_t waitId) 454 { 455 assert(false, "TODO!"); 456 } 457 458 override bool isValid(ProcessID handle) 459 const { 460 return false; 461 } 462 463 override void addRef(ProcessID pid) 464 { 465 assert(false, "TODO!"); 466 } 467 468 override bool releaseRef(ProcessID pid) 469 { 470 assert(false, "TODO!"); 471 } 472 473 protected override void* rawUserData(ProcessID descriptor, size_t size, DataInitializer initialize, DataInitializer destroy) 474 @system { 475 assert(false, "TODO!"); 476 } 477 478 package final @property size_t pendingCount() const nothrow { return 0; } 479 }